Invoke extensions for execute_schema.

This commit is contained in:
Sunli 2020-09-26 15:52:59 +08:00
parent 5971b44105
commit b47d08c5b5
7 changed files with 64 additions and 19 deletions

View File

@ -267,7 +267,11 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
let field = field.clone();
let field_name = field_name.clone();
async move {
let resolve_id = ::std::sync::atomic::AtomicUsize::default();
let resolve_id = #crate_name::ResolveId {
parent: Some(0),
current: 1,
};
let inc_resolve_id = ::std::sync::atomic::AtomicUsize::new(1);
let ctx_selection_set = query_env.create_context(
&schema_env,
Some(#crate_name::QueryPathNode {
@ -275,15 +279,35 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
segment: #crate_name::QueryPathSegment::Name(&field_name),
}),
&field.node.selection_set,
&resolve_id,
resolve_id,
&inc_resolve_id,
);
#crate_name::OutputValueType::resolve(&msg, &ctx_selection_set, &*field)
#crate_name::extensions::Extension::execution_start(&mut *query_env.extensions.lock());
#[allow(bare_trait_objects)]
let ri = #crate_name::extensions::ResolveInfo {
resolve_id,
path_node: ctx_selection_set.path_node.as_ref().unwrap(),
parent_type: #gql_typename,
return_type: &<<#stream_ty as #crate_name::futures::stream::Stream>::Item as #crate_name::Type>::qualified_type_name(),
schema_env: &schema_env,
query_env: &query_env,
};
#crate_name::extensions::Extension::resolve_start(&mut *query_env.extensions.lock(), &ri);
let res = #crate_name::OutputValueType::resolve(&msg, &ctx_selection_set, &*field)
.await
.map(|value| {
#crate_name::serde_json::json!({
field_name.as_str(): value
})
})
});
#crate_name::extensions::Extension::resolve_end(&mut *query_env.extensions.lock(), &ri);
#crate_name::extensions::Extension::execution_end(&mut *query_env.extensions.lock());
res
}
}
});

View File

@ -214,7 +214,8 @@ pub struct ResolveId {
}
impl ResolveId {
pub(crate) fn root() -> ResolveId {
#[doc(hidden)]
pub fn root() -> ResolveId {
ResolveId {
parent: None,
current: 0,
@ -292,11 +293,12 @@ impl QueryEnv {
schema_env: &'a SchemaEnv,
path_node: Option<QueryPathNode<'a>>,
item: T,
resolve_id: ResolveId,
inc_resolve_id: &'a AtomicUsize,
) -> ContextBase<'a, T> {
ContextBase {
path_node,
resolve_id: ResolveId::root(),
resolve_id,
inc_resolve_id,
item,
schema_env,
@ -306,7 +308,8 @@ impl QueryEnv {
}
impl<'a, T> ContextBase<'a, T> {
fn get_child_resolve_id(&self) -> ResolveId {
#[doc(hidden)]
pub fn get_child_resolve_id(&self) -> ResolveId {
let id = self
.inc_resolve_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)

View File

@ -41,8 +41,11 @@ pub struct ResolveInfo<'a> {
/// Current return type, is qualified name.
pub return_type: &'a str,
pub(crate) schema_env: &'a SchemaEnv,
pub(crate) query_env: &'a QueryEnv,
#[doc(hidden)]
pub schema_env: &'a SchemaEnv,
#[doc(hidden)]
pub query_env: &'a QueryEnv,
}
impl<'a> ResolveInfo<'a> {

View File

@ -69,16 +69,25 @@ impl Extension for Tracing {
}
fn execution_start(&mut self) {
if let Some(parent) = &self.root {
let execute_span = span!(
let execute_span = if let Some(parent) = &self.root {
span!(
target: "async_graphql::graphql",
parent: parent,
Level::INFO,
"execute"
);
execute_span.with_subscriber(|(id, d)| d.enter(id));
self.execute.replace(execute_span);
}
)
} else {
// For every step of the subscription stream.
span!(
target: "async_graphql::graphql",
parent: None,
Level::INFO,
"execute"
)
};
execute_span.with_subscriber(|(id, d)| d.enter(id));
self.execute.replace(execute_span);
}
fn execution_end(&mut self) {
@ -104,7 +113,9 @@ impl Extension for Tracing {
Level::INFO,
"field",
id = %info.resolve_id.current,
path = %info.path_node
path = %info.path_node,
parent_type = %info.parent_type,
return_type = %info.return_type,
);
span.with_subscriber(|(id, d)| d.enter(id));
self.fields.insert(info.resolve_id.current, span);

View File

@ -153,7 +153,7 @@ pub use subscription::SubscriptionType;
pub use async_graphql_parser as parser;
pub use base::{InputValueType, OutputValueType, Type};
pub use context::{
Context, ContextBase, Data, QueryEnv, QueryPathNode, QueryPathSegment, Variables,
Context, ContextBase, Data, QueryEnv, QueryPathNode, QueryPathSegment, ResolveId, Variables,
};
pub use error::{
Error, ErrorExtensions, FieldError, FieldResult, InputValueError, InputValueResult,

View File

@ -17,7 +17,7 @@ pub async fn resolve_list<'a, T: OutputValueType + Send + Sync + 'a>(
resolve_id: ctx_idx.resolve_id,
path_node: ctx_idx.path_node.as_ref().unwrap(),
parent_type: &Vec::<T>::type_name(),
return_type: &T::type_name(),
return_type: &T::qualified_type_name(),
schema_env: ctx.schema_env,
query_env: ctx.query_env,
};

View File

@ -519,17 +519,21 @@ where
&schema.env,
None,
&env.operation.node.selection_set,
ResolveId::root(),
&resolve_id,
);
// TODO: Invoke extensions
env.extensions.lock().execution_start();
let mut streams = Vec::new();
if let Err(e) = collect_subscription_streams(&ctx, &schema.subscription, &mut streams) {
env.extensions.lock().execution_end();
yield Response::from(e);
return;
}
env.extensions.lock().execution_end();
let mut stream = stream::select_all(streams);
while let Some(data) = stream.next().await {
let is_err = data.is_err();