Improve performance

This commit is contained in:
Sunli 2020-10-12 14:49:32 +08:00
parent a18ee76cdb
commit 25af42ed81
12 changed files with 197 additions and 146 deletions

View File

@ -298,7 +298,7 @@ pub fn generate(
query_data: &query_env.ctx_data,
};
#crate_name::extensions::Extension::execution_start(&mut *query_env.extensions.lock(), &ctx_extension);
query_env.extensions.execution_start(&ctx_extension);
#[allow(bare_trait_objects)]
let ri = #crate_name::extensions::ResolveInfo {
@ -308,12 +308,12 @@ pub fn generate(
return_type: &<<#stream_ty as #crate_name::futures::stream::Stream>::Item as #crate_name::Type>::qualified_type_name(),
};
#crate_name::extensions::Extension::resolve_start(&mut *query_env.extensions.lock(), &ctx_extension, &ri);
query_env.extensions.resolve_start(&ctx_extension, &ri);
let res = #crate_name::OutputValueType::resolve(&msg, &ctx_selection_set, &*field).await;
#crate_name::extensions::Extension::resolve_end(&mut *query_env.extensions.lock(), &ctx_extension, &ri);
#crate_name::extensions::Extension::execution_end(&mut *query_env.extensions.lock(), &ctx_extension);
query_env.extensions.resolve_end(&ctx_extension, &ri);
query_env.extensions.execution_end(&ctx_extension);
res
}

View File

@ -255,7 +255,7 @@ pub struct ContextBase<'a, T> {
#[doc(hidden)]
pub struct QueryEnvInner {
pub extensions: spin::Mutex<Extensions>,
pub extensions: Extensions,
pub variables: Variables,
pub operation: Positioned<OperationDefinition>,
pub fragments: HashMap<Name, Positioned<FragmentDefinition>>,

View File

@ -25,9 +25,6 @@ use std::collections::BTreeMap;
pub(crate) type BoxExtension = Box<dyn Extension>;
#[doc(hidden)]
pub struct Extensions(pub(crate) Vec<BoxExtension>);
/// Context for extension
pub struct ExtensionContext<'a> {
#[doc(hidden)]
@ -147,21 +144,21 @@ pub trait Extension: Sync + Send + 'static {
}
pub(crate) trait ErrorLogger {
fn log_error(self, ctx: &ExtensionContext<'_>, extensions: &spin::Mutex<Extensions>) -> Self;
fn log_error(self, ctx: &ExtensionContext<'_>, extensions: &Extensions) -> Self;
}
impl<T> ErrorLogger for ServerResult<T> {
fn log_error(self, ctx: &ExtensionContext<'_>, extensions: &spin::Mutex<Extensions>) -> Self {
fn log_error(self, ctx: &ExtensionContext<'_>, extensions: &Extensions) -> Self {
if let Err(err) = &self {
extensions.lock().error(ctx, err);
extensions.error(ctx, err);
}
self
}
}
impl<T> ErrorLogger for Result<T, Vec<ServerError>> {
fn log_error(self, ctx: &ExtensionContext<'_>, extensions: &spin::Mutex<Extensions>) -> Self {
fn log_error(self, ctx: &ExtensionContext<'_>, extensions: &Extensions) -> Self {
if let Err(errors) = &self {
let mut extensions = extensions.lock();
for error in errors {
extensions.error(ctx, error);
}
@ -170,69 +167,114 @@ impl<T> ErrorLogger for Result<T, Vec<ServerError>> {
}
}
#[async_trait::async_trait]
impl Extension for Extensions {
async fn prepare_request(
&mut self,
/// Extension factory
///
/// Used to create an extension instance.
pub trait ExtensionFactory: Send + Sync + 'static {
/// Create an extended instance.
fn create(&self) -> Box<dyn Extension>;
}
#[doc(hidden)]
pub struct Extensions(Option<spin::Mutex<Vec<BoxExtension>>>);
impl From<Vec<BoxExtension>> for Extensions {
fn from(extensions: Vec<BoxExtension>) -> Self {
Self(if extensions.is_empty() {
None
} else {
Some(spin::Mutex::new(extensions))
})
}
}
#[doc(hidden)]
impl Extensions {
pub fn is_empty(&self) -> bool {
self.0.is_none()
}
pub async fn prepare_request(
&self,
ctx: &ExtensionContext<'_>,
request: Request,
) -> ServerResult<Request> {
let mut request = request;
for e in self.0.iter_mut() {
request = e.prepare_request(ctx, request).await?;
if let Some(e) = &self.0 {
for e in e.lock().iter_mut() {
request = e.prepare_request(ctx, request).await?;
}
}
Ok(request)
}
fn parse_start(
&mut self,
pub fn parse_start(
&self,
ctx: &ExtensionContext<'_>,
query_source: &str,
variables: &Variables,
) {
self.0
.iter_mut()
.for_each(|e| e.parse_start(ctx, query_source, variables));
if let Some(e) = &self.0 {
e.lock()
.iter_mut()
.for_each(|e| e.parse_start(ctx, query_source, variables));
}
}
fn parse_end(&mut self, ctx: &ExtensionContext<'_>, document: &ExecutableDocument) {
self.0.iter_mut().for_each(|e| e.parse_end(ctx, document));
pub fn parse_end(&self, ctx: &ExtensionContext<'_>, document: &ExecutableDocument) {
if let Some(e) = &self.0 {
e.lock().iter_mut().for_each(|e| e.parse_end(ctx, document));
}
}
fn validation_start(&mut self, ctx: &ExtensionContext<'_>) {
self.0.iter_mut().for_each(|e| e.validation_start(ctx));
pub fn validation_start(&self, ctx: &ExtensionContext<'_>) {
if let Some(e) = &self.0 {
e.lock().iter_mut().for_each(|e| e.validation_start(ctx));
}
}
fn validation_end(&mut self, ctx: &ExtensionContext<'_>) {
self.0.iter_mut().for_each(|e| e.validation_end(ctx));
pub fn validation_end(&self, ctx: &ExtensionContext<'_>) {
if let Some(e) = &self.0 {
e.lock().iter_mut().for_each(|e| e.validation_end(ctx));
}
}
fn execution_start(&mut self, ctx: &ExtensionContext<'_>) {
self.0.iter_mut().for_each(|e| e.execution_start(ctx));
pub fn execution_start(&self, ctx: &ExtensionContext<'_>) {
if let Some(e) = &self.0 {
e.lock().iter_mut().for_each(|e| e.execution_start(ctx));
}
}
fn execution_end(&mut self, ctx: &ExtensionContext<'_>) {
self.0.iter_mut().for_each(|e| e.execution_end(ctx));
pub fn execution_end(&self, ctx: &ExtensionContext<'_>) {
if let Some(e) = &self.0 {
e.lock().iter_mut().for_each(|e| e.execution_end(ctx));
}
}
fn resolve_start(&mut self, ctx: &ExtensionContext<'_>, info: &ResolveInfo<'_>) {
self.0.iter_mut().for_each(|e| e.resolve_start(ctx, info));
pub fn resolve_start(&self, ctx: &ExtensionContext<'_>, info: &ResolveInfo<'_>) {
if let Some(e) = &self.0 {
e.lock().iter_mut().for_each(|e| e.resolve_start(ctx, info));
}
}
fn resolve_end(&mut self, ctx: &ExtensionContext<'_>, resolve_id: &ResolveInfo<'_>) {
self.0
.iter_mut()
.for_each(|e| e.resolve_end(ctx, resolve_id));
pub fn resolve_end(&self, ctx: &ExtensionContext<'_>, resolve_id: &ResolveInfo<'_>) {
if let Some(e) = &self.0 {
e.lock()
.iter_mut()
.for_each(|e| e.resolve_end(ctx, resolve_id));
}
}
fn error(&mut self, ctx: &ExtensionContext<'_>, err: &ServerError) {
self.0.iter_mut().for_each(|e| e.error(ctx, err));
pub fn error(&self, ctx: &ExtensionContext<'_>, err: &ServerError) {
if let Some(e) = &self.0 {
e.lock().iter_mut().for_each(|e| e.error(ctx, err));
}
}
fn result(&mut self, ctx: &ExtensionContext<'_>) -> Option<Value> {
if !self.0.is_empty() {
let value = self
.0
pub fn result(&self, ctx: &ExtensionContext<'_>) -> Option<Value> {
if let Some(e) = &self.0 {
let value = e
.lock()
.iter_mut()
.filter_map(|e| {
if let Some(name) = e.name() {
@ -252,11 +294,3 @@ impl Extension for Extensions {
}
}
}
/// Extension factory
///
/// Used to create an extension instance.
pub trait ExtensionFactory: Send + Sync + 'static {
/// Create an extended instance.
fn create(&self) -> Box<dyn Extension>;
}

View File

@ -1,4 +1,4 @@
use crate::extensions::{ErrorLogger, Extension, ExtensionContext, ResolveInfo};
use crate::extensions::{ErrorLogger, ExtensionContext, ResolveInfo};
use crate::parser::types::Selection;
use crate::registry::MetaType;
use crate::{
@ -161,49 +161,62 @@ impl<'a> Fields<'a> {
query_data: &ctx.query_env.ctx_data,
};
let resolve_info = ResolveInfo {
resolve_id: ctx_field.resolve_id,
path_node: ctx_field.path_node.as_ref().unwrap(),
parent_type: &T::type_name(),
return_type: match ctx_field
.schema_env
.registry
.types
.get(T::type_name().as_ref())
.and_then(|ty| ty.field_by_name(field.node.name.node.as_str()))
.map(|field| &field.ty)
{
Some(ty) => &ty,
None => {
return Err(ServerError::new(format!(
r#"Cannot query field "{}" on type "{}"."#,
field_name,
T::type_name()
))
.at(ctx_field.item.pos)
.path(PathSegment::Field(field_name.to_string())));
if ctx_field.query_env.extensions.is_empty() {
match root.resolve_field(&ctx_field).await {
Ok(value) => Ok((field_name, value.unwrap())),
Err(e) => {
Err(e.path(PathSegment::Field(field_name.to_string())))
}
},
};
}
.log_error(&ctx_extension, &ctx_field.query_env.extensions)
} else {
let resolve_info = ResolveInfo {
resolve_id: ctx_field.resolve_id,
path_node: ctx_field.path_node.as_ref().unwrap(),
parent_type: &T::type_name(),
return_type: match ctx_field
.schema_env
.registry
.types
.get(T::type_name().as_ref())
.and_then(|ty| {
ty.field_by_name(field.node.name.node.as_str())
})
.map(|field| &field.ty)
{
Some(ty) => &ty,
None => {
return Err(ServerError::new(format!(
r#"Cannot query field "{}" on type "{}"."#,
field_name,
T::type_name()
))
.at(ctx_field.item.pos)
.path(PathSegment::Field(field_name.to_string())));
}
},
};
ctx_field
.query_env
.extensions
.lock()
.resolve_start(&ctx_extension, &resolve_info);
ctx_field
.query_env
.extensions
.resolve_start(&ctx_extension, &resolve_info);
let res = match root.resolve_field(&ctx_field).await {
Ok(value) => Ok((field_name, value.unwrap())),
Err(e) => Err(e.path(PathSegment::Field(field_name.to_string()))),
let res = match root.resolve_field(&ctx_field).await {
Ok(value) => Ok((field_name, value.unwrap())),
Err(e) => {
Err(e.path(PathSegment::Field(field_name.to_string())))
}
}
.log_error(&ctx_extension, &ctx_field.query_env.extensions)?;
ctx_field
.query_env
.extensions
.resolve_end(&ctx_extension, &resolve_info);
Ok(res)
}
.log_error(&ctx_extension, &ctx_field.query_env.extensions)?;
ctx_field
.query_env
.extensions
.lock()
.resolve_end(&ctx_extension, &resolve_info);
Ok(res)
}
}));
}

View File

@ -1,4 +1,4 @@
use crate::extensions::{ErrorLogger, Extension, ExtensionContext, ResolveInfo};
use crate::extensions::{ErrorLogger, ExtensionContext, ResolveInfo};
use crate::parser::types::Field;
use crate::{
ContextSelectionSet, OutputValueType, PathSegment, Positioned, ServerResult, Type, Value,
@ -9,41 +9,48 @@ pub async fn resolve_list<'a, T: OutputValueType + Send + Sync + 'a>(
ctx: &ContextSelectionSet<'a>,
field: &Positioned<Field>,
iter: impl IntoIterator<Item = T>,
len: Option<usize>,
) -> ServerResult<Value> {
let mut futures = Vec::new();
let mut futures = len.map(|size| Vec::with_capacity(size)).unwrap_or_default();
for (idx, item) in iter.into_iter().enumerate() {
let ctx_idx = ctx.with_index(idx);
futures.push(async move {
let resolve_info = ResolveInfo {
resolve_id: ctx_idx.resolve_id,
path_node: ctx_idx.path_node.as_ref().unwrap(),
parent_type: &Vec::<T>::type_name(),
return_type: &T::qualified_type_name(),
};
let ctx_extension = ExtensionContext {
schema_data: &ctx.schema_env.data,
query_data: &ctx.query_env.ctx_data,
};
ctx_idx
.query_env
.extensions
.lock()
.resolve_start(&ctx_extension, &resolve_info);
if ctx_idx.query_env.extensions.is_empty() {
OutputValueType::resolve(&item, &ctx_idx, field)
.await
.map_err(|e| e.path(PathSegment::Index(idx)))
.log_error(&ctx_extension, &ctx_idx.query_env.extensions)
} else {
let resolve_info = ResolveInfo {
resolve_id: ctx_idx.resolve_id,
path_node: ctx_idx.path_node.as_ref().unwrap(),
parent_type: &Vec::<T>::type_name(),
return_type: &T::qualified_type_name(),
};
let res = OutputValueType::resolve(&item, &ctx_idx, field)
.await
.map_err(|e| e.path(PathSegment::Index(idx)))
.log_error(&ctx_extension, &ctx_idx.query_env.extensions)?;
ctx_idx
.query_env
.extensions
.resolve_start(&ctx_extension, &resolve_info);
ctx_idx
.query_env
.extensions
.lock()
.resolve_end(&ctx_extension, &resolve_info);
let res = OutputValueType::resolve(&item, &ctx_idx, field)
.await
.map_err(|e| e.path(PathSegment::Index(idx)))
.log_error(&ctx_extension, &ctx_idx.query_env.extensions)?;
ServerResult::Ok(res)
ctx_idx
.query_env
.extensions
.resolve_end(&ctx_extension, &resolve_info);
Ok(res)
}
});
}

View File

@ -1,5 +1,5 @@
use crate::context::{Data, QueryEnvInner, ResolveId};
use crate::extensions::{ErrorLogger, Extension, ExtensionContext, ExtensionFactory, Extensions};
use crate::extensions::{ErrorLogger, ExtensionContext, ExtensionFactory, Extensions};
use crate::model::__DirectiveLocation;
use crate::parser::parse_query;
use crate::parser::types::{DocumentOperations, OperationType};
@ -325,16 +325,15 @@ where
request: Request,
) -> Result<(QueryEnvInner, CacheControl), Vec<ServerError>> {
// create extension instances
let extensions = spin::Mutex::new(Extensions(
self.0
.extensions
.iter()
.map(|factory| factory.create())
.collect_vec(),
));
let extensions: Extensions = self
.0
.extensions
.iter()
.map(|factory| factory.create())
.collect_vec()
.into();
let request = extensions
.lock()
.prepare_request(
&ExtensionContext {
schema_data: &self.env.data,
@ -349,16 +348,14 @@ where
query_data: &request.data,
};
extensions
.lock()
.parse_start(&ctx_extension, &request.query, &request.variables);
extensions.parse_start(&ctx_extension, &request.query, &request.variables);
let document = parse_query(&request.query)
.map_err(Into::<ServerError>::into)
.log_error(&ctx_extension, &extensions)?;
extensions.lock().parse_end(&ctx_extension, &document);
extensions.parse_end(&ctx_extension, &document);
// check rules
extensions.lock().validation_start(&ctx_extension);
extensions.validation_start(&ctx_extension);
let CheckResult {
cache_control,
complexity,
@ -370,7 +367,7 @@ where
self.validation_mode,
)
.log_error(&ctx_extension, &extensions)?;
extensions.lock().validation_end(&ctx_extension);
extensions.validation_end(&ctx_extension);
// check limit
if let Some(limit_complexity) = self.complexity {
@ -411,7 +408,7 @@ where
let operation = match operation {
Ok(operation) => operation,
Err(e) => {
extensions.lock().error(&ctx_extension, &e);
extensions.error(&ctx_extension, &e);
return Err(vec![e]);
}
};
@ -443,7 +440,7 @@ where
query_data: &env.ctx_data,
};
env.extensions.lock().execution_start(&ctx_extension);
env.extensions.execution_start(&ctx_extension);
let data = match &env.operation.node.ty {
OperationType::Query => resolve_container(&ctx, &self.query).await,
@ -455,8 +452,8 @@ where
}
};
env.extensions.lock().execution_end(&ctx_extension);
let extensions = env.extensions.lock().result(&ctx_extension);
env.extensions.execution_end(&ctx_extension);
let extensions = env.extensions.result(&ctx_extension);
match data {
Ok(data) => Response::new(data),
@ -530,21 +527,21 @@ where
query_data: &env.ctx_data,
};
env.extensions.lock().execution_start(&ctx_extension);
env.extensions.execution_start(&ctx_extension);
let mut streams = Vec::new();
if let Err(e) = collect_subscription_streams(&ctx, &schema.subscription, &mut streams) {
env.extensions.lock().execution_end(&ctx_extension);
env.extensions.execution_end(&ctx_extension);
yield Response::from_errors(vec![e]);
return;
}
env.extensions.lock().execution_end(&ctx_extension);
env.extensions.execution_end(&ctx_extension);
let mut stream = stream::select_all(streams);
while let Some(data) = stream.next().await {
let is_err = data.is_err();
let extensions = env.extensions.lock().result(&ctx_extension);
let extensions = env.extensions.result(&ctx_extension);
yield match data {
Ok((name, value)) => {
let mut map = BTreeMap::new();

View File

@ -52,6 +52,6 @@ impl<T: OutputValueType + Send + Sync + Ord> OutputValueType for BTreeSet<T> {
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
resolve_list(ctx, field, self).await
resolve_list(ctx, field, self, Some(self.len())).await
}
}

View File

@ -54,6 +54,6 @@ impl<T: OutputValueType + Send + Sync + Hash + Eq> OutputValueType for HashSet<T
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
resolve_list(ctx, field, self).await
resolve_list(ctx, field, self, Some(self.len())).await
}
}

View File

@ -52,6 +52,6 @@ impl<T: OutputValueType + Send + Sync> OutputValueType for LinkedList<T> {
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
resolve_list(ctx, field, self).await
resolve_list(ctx, field, self, Some(self.len())).await
}
}

View File

@ -27,6 +27,6 @@ impl<T: OutputValueType + Send + Sync> OutputValueType for &[T] {
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
resolve_list(ctx, field, self.iter()).await
resolve_list(ctx, field, self.iter(), Some(self.len())).await
}
}

View File

@ -47,6 +47,6 @@ impl<T: OutputValueType + Send + Sync> OutputValueType for Vec<T> {
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
resolve_list(ctx, field, self).await
resolve_list(ctx, field, self, Some(self.len())).await
}
}

View File

@ -52,6 +52,6 @@ impl<T: OutputValueType + Send + Sync> OutputValueType for VecDeque<T> {
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
resolve_list(ctx, field, self).await
resolve_list(ctx, field, self, Some(self.len())).await
}
}