Extension::Logger now provides more comprehensive error information.

This commit is contained in:
Sunli 2020-06-13 22:14:47 +08:00
parent 05a1a0bfe6
commit 7630fe1f51
10 changed files with 189 additions and 164 deletions

View File

@ -46,6 +46,7 @@ indexmap = "1.3.2"
async-stream = "0.2.1"
multer = "1.2.0"
log = "0.4.8"
spin = "0.5.2"
bson = { version = "1.0.0", optional = true }
uuid = { version = "0.8.1", features = ["v4"] }
url = { version = "2.1.1", optional = true }

View File

@ -11,6 +11,7 @@ use futures::Future;
use parking_lot::Mutex;
use std::any::{Any, TypeId};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::fs::File;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
@ -39,6 +40,12 @@ impl Deref for Variables {
}
}
impl Display for Variables {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl DerefMut for Variables {
fn deref_mut(&mut self) -> &mut Self::Target {
if let Value::Object(obj) = &mut self.0 {
@ -280,7 +287,7 @@ impl<'a, T> Deref for ContextBase<'a, T> {
#[doc(hidden)]
pub struct QueryEnvInner {
pub extensions: Extensions,
pub extensions: spin::Mutex<Extensions>,
pub variables: Variables,
pub document: Document,
pub ctx_data: Arc<Data>,
@ -301,7 +308,7 @@ impl Deref for QueryEnv {
impl QueryEnv {
#[doc(hidden)]
pub fn new(
extensions: Extensions,
extensions: spin::Mutex<Extensions>,
variables: Variables,
document: Document,
ctx_data: Arc<Data>,

View File

@ -1,6 +1,6 @@
use crate::extensions::{Extension, ResolveInfo};
use crate::Variables;
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use serde::ser::SerializeMap;
use serde::{Serialize, Serializer};
use std::collections::BTreeMap;
@ -44,14 +44,18 @@ impl Serialize for ResolveStat {
}
}
struct Inner {
/// Apollo tracing extension for performance tracing
///
/// Apollo Tracing works by including data in the extensions field of the GraphQL response, which is reserved by the GraphQL spec for extra information that a server wants to return. That way, you have access to performance traces alongside the data returned by your query.
/// Its already supported by `Apollo Engine`, and were excited to see what other kinds of integrations people can build on top of this format.
pub struct ApolloTracing {
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
pending_resolves: BTreeMap<usize, PendingResolve>,
resolves: Vec<ResolveStat>,
}
impl Default for Inner {
impl Default for ApolloTracing {
fn default() -> Self {
Self {
start_time: Utc::now(),
@ -62,31 +66,21 @@ impl Default for Inner {
}
}
/// Apollo tracing extension for performance tracing
///
/// Apollo Tracing works by including data in the extensions field of the GraphQL response, which is reserved by the GraphQL spec for extra information that a server wants to return. That way, you have access to performance traces alongside the data returned by your query.
/// Its already supported by `Apollo Engine`, and were excited to see what other kinds of integrations people can build on top of this format.
#[derive(Default)]
pub struct ApolloTracing {
inner: Mutex<Inner>,
}
impl Extension for ApolloTracing {
fn name(&self) -> Option<&'static str> {
Some("tracing")
}
fn parse_start(&self, _query_source: &str) {
self.inner.lock().start_time = Utc::now();
fn parse_start(&mut self, _query_source: &str, _variables: &Variables) {
self.start_time = Utc::now();
}
fn execution_end(&self) {
self.inner.lock().end_time = Utc::now();
fn execution_end(&mut self) {
self.end_time = Utc::now();
}
fn resolve_start(&self, info: &ResolveInfo<'_>) {
let mut inner = self.inner.lock();
inner.pending_resolves.insert(
fn resolve_start(&mut self, info: &ResolveInfo<'_>) {
self.pending_resolves.insert(
info.resolve_id.current,
PendingResolve {
path: info.path_node.to_json().into(),
@ -98,13 +92,12 @@ impl Extension for ApolloTracing {
);
}
fn resolve_end(&self, info: &ResolveInfo<'_>) {
let mut inner = self.inner.lock();
if let Some(pending_resolve) = inner.pending_resolves.remove(&info.resolve_id.current) {
let start_offset = (pending_resolve.start_time - inner.start_time)
fn resolve_end(&mut self, info: &ResolveInfo<'_>) {
if let Some(pending_resolve) = self.pending_resolves.remove(&info.resolve_id.current) {
let start_offset = (pending_resolve.start_time - self.start_time)
.num_nanoseconds()
.unwrap();
inner.resolves.push(ResolveStat {
self.resolves.push(ResolveStat {
pending_resolve,
start_offset,
end_time: Utc::now(),
@ -112,18 +105,16 @@ impl Extension for ApolloTracing {
}
}
fn result(&self) -> Option<serde_json::Value> {
let mut inner = self.inner.lock();
inner
.resolves
fn result(&mut self) -> Option<serde_json::Value> {
self.resolves
.sort_by(|a, b| a.start_offset.cmp(&b.start_offset));
Some(serde_json::json!({
"version": 1,
"startTime": inner.start_time.to_rfc3339(),
"endTime": inner.end_time.to_rfc3339(),
"duration": (inner.end_time - inner.start_time).num_nanoseconds(),
"startTime": self.start_time.to_rfc3339(),
"endTime": self.end_time.to_rfc3339(),
"duration": (self.end_time - self.start_time).num_nanoseconds(),
"execution": {
"resolvers": inner.resolves
"resolvers": self.resolves
}
}))
}

View File

@ -1,27 +1,36 @@
use crate::extensions::{Extension, ResolveInfo};
use crate::Error;
use crate::{Error, Variables};
use async_graphql_parser::query::{Definition, Document, OperationDefinition, Selection};
use itertools::Itertools;
use std::sync::atomic::{AtomicBool, Ordering};
use std::borrow::Cow;
use uuid::Uuid;
/// Logger extension
pub struct Logger {
id: Uuid,
enabled: AtomicBool,
enabled: bool,
query: String,
variables: Variables,
}
impl Default for Logger {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
enabled: AtomicBool::new(true),
enabled: true,
query: String::new(),
variables: Default::default(),
}
}
}
impl Extension for Logger {
fn parse_end(&self, query_source: &str, document: &Document) {
fn parse_start(&mut self, query_source: &str, variables: &Variables) {
self.query = query_source.replace(char::is_whitespace, "");
self.variables = variables.clone();
}
fn parse_end(&mut self, document: &Document) {
let mut is_schema = false;
for definition in document.definitions() {
@ -46,37 +55,49 @@ impl Extension for Logger {
}
if is_schema {
self.enabled.store(false, Ordering::Relaxed);
self.enabled = false;
return;
}
info!(target: "async-graphql", "query, id: {}, source: \"{}\"", self.id, query_source);
info!(target: "async-graphql", "[Query] id: \"{}\", query: \"{}\", variables: {}", self.id, &self.query, self.variables);
}
fn resolve_start(&self, info: &ResolveInfo<'_>) {
if !self.enabled.load(Ordering::Relaxed) {
fn resolve_start(&mut self, info: &ResolveInfo<'_>) {
if !self.enabled {
return;
}
trace!(target: "async-graphql", "resolve start, id: {}, path: \"{}\"", self.id, info.path_node);
trace!(target: "async-graphql", "[ResolveStart] id: \"{}\", path: \"{}\"", self.id, info.path_node);
}
fn resolve_end(&self, info: &ResolveInfo<'_>) {
if !self.enabled.load(Ordering::Relaxed) {
fn resolve_end(&mut self, info: &ResolveInfo<'_>) {
if !self.enabled {
return;
}
trace!(target: "async-graphql", "resolve end, id: {}, path: \"{}\"", self.id, info.path_node);
trace!(target: "async-graphql", "[ResolveEnd] id: \"{}\", path: \"{}\"", self.id, info.path_node);
}
fn error(&self, err: &Error) {
fn error(&mut self, err: &Error) {
match err {
Error::Parse(err) => {
error!(target: "async-graphql", "parse error, id: {}, [{}:{}] {}", self.id, err.pos.line, err.pos.column, err)
error!(target: "async-graphql", "[ParseError] id: \"{}\", pos: [{}:{}], query: \"{}\", variables: {}, {}", self.id, err.pos.line, err.pos.column, self.query, self.variables, err)
}
Error::Query { pos, path, err } => {
if let Some(path) = path {
error!(target: "async-graphql", "query error, id: {}, path: \"{}\", [{}:{}] {}", self.id, path, pos.line, pos.column, err)
let path = if let serde_json::Value::Array(values) = path {
values
.iter()
.filter_map(|value| match value {
serde_json::Value::String(s) => Some(Cow::Borrowed(s.as_str())),
serde_json::Value::Number(n) => Some(Cow::Owned(n.to_string())),
_ => None,
})
.join(".")
} else {
String::new()
};
error!(target: "async-graphql", "[QueryError] id: \"{}\", path: \"{}\", pos: [{}:{}], query: \"{}\", variables: {}, {}", self.id, path, pos.line, pos.column, self.query, self.variables, err)
} else {
error!(target: "async-graphql", "query error, id: {}, [{}:{}] {}", self.id, pos.line, pos.column, err)
error!(target: "async-graphql", "[QueryError] id: \"{}\", pos: [{}:{}], query: \"{}\", variables: {}, {}", self.id, pos.line, pos.column, self.query, self.variables, err)
}
}
Error::Rule { errors } => {
@ -86,7 +107,7 @@ impl Extension for Logger {
.iter()
.map(|pos| format!("{}:{}", pos.line, pos.column))
.join(", ");
error!(target: "async-graphql", "validation error, id: {}, [{}] {}", self.id, locations, error.message)
error!(target: "async-graphql", "[ValidationError] id: \"{}\", pos: [{}], query: \"{}\", variables: {}, {}", self.id, locations, self.query, self.variables, error.message)
}
}
}

View File

@ -5,7 +5,7 @@ mod logger;
mod tracing;
use crate::context::{QueryPathNode, ResolveId};
use crate::Result;
use crate::{Result, Variables};
pub use self::apollo_tracing::ApolloTracing;
pub use self::logger::Logger;
@ -44,91 +44,95 @@ pub trait Extension: Sync + Send + 'static {
}
/// Called at the begin of the parse.
fn parse_start(&self, query_source: &str) {}
fn parse_start(&mut self, query_source: &str, variables: &Variables) {}
/// Called at the end of the parse.
fn parse_end(&self, query_source: &str, document: &Document) {}
fn parse_end(&mut self, document: &Document) {}
/// Called at the begin of the validation.
fn validation_start(&self) {}
fn validation_start(&mut self) {}
/// Called at the end of the validation.
fn validation_end(&self) {}
fn validation_end(&mut self) {}
/// Called at the begin of the execution.
fn execution_start(&self) {}
fn execution_start(&mut self) {}
/// Called at the end of the execution.
fn execution_end(&self) {}
fn execution_end(&mut self) {}
/// Called at the begin of the resolve field.
fn resolve_start(&self, info: &ResolveInfo<'_>) {}
fn resolve_start(&mut self, info: &ResolveInfo<'_>) {}
/// Called at the end of the resolve field.
fn resolve_end(&self, info: &ResolveInfo<'_>) {}
fn resolve_end(&mut self, info: &ResolveInfo<'_>) {}
/// Called when an error occurs.
fn error(&self, err: &Error) {}
fn error(&mut self, err: &Error) {}
/// Get the results
fn result(&self) -> Option<serde_json::Value> {
fn result(&mut self) -> Option<serde_json::Value> {
None
}
}
impl Extensions {
pub(crate) fn log_error<T>(&self, res: Result<T>) -> Result<T> {
if let Err(err) = &res {
self.error(err);
pub(crate) trait ErrorLogger {
fn log_error(self, extensions: &spin::Mutex<Extensions>) -> Self;
}
impl<T> ErrorLogger for Result<T> {
fn log_error(self, extensions: &spin::Mutex<Extensions>) -> Self {
if let Err(err) = &self {
extensions.lock().error(err);
}
res
self
}
}
impl Extension for Extensions {
fn parse_start(&self, query_source: &str) {
self.0.iter().for_each(|e| e.parse_start(query_source));
}
fn parse_end(&self, query_source: &str, document: &Document) {
fn parse_start(&mut self, query_source: &str, variables: &Variables) {
self.0
.iter()
.for_each(|e| e.parse_end(query_source, document));
.iter_mut()
.for_each(|e| e.parse_start(query_source, variables));
}
fn validation_start(&self) {
self.0.iter().for_each(|e| e.validation_start());
fn parse_end(&mut self, document: &Document) {
self.0.iter_mut().for_each(|e| e.parse_end(document));
}
fn validation_end(&self) {
self.0.iter().for_each(|e| e.validation_end());
fn validation_start(&mut self) {
self.0.iter_mut().for_each(|e| e.validation_start());
}
fn execution_start(&self) {
self.0.iter().for_each(|e| e.execution_start());
fn validation_end(&mut self) {
self.0.iter_mut().for_each(|e| e.validation_end());
}
fn execution_end(&self) {
self.0.iter().for_each(|e| e.execution_end());
fn execution_start(&mut self) {
self.0.iter_mut().for_each(|e| e.execution_start());
}
fn resolve_start(&self, info: &ResolveInfo<'_>) {
self.0.iter().for_each(|e| e.resolve_start(info));
fn execution_end(&mut self) {
self.0.iter_mut().for_each(|e| e.execution_end());
}
fn resolve_end(&self, resolve_id: &ResolveInfo<'_>) {
self.0.iter().for_each(|e| e.resolve_end(resolve_id));
fn resolve_start(&mut self, info: &ResolveInfo<'_>) {
self.0.iter_mut().for_each(|e| e.resolve_start(info));
}
fn error(&self, err: &Error) {
self.0.iter().for_each(|e| e.error(err));
fn resolve_end(&mut self, resolve_id: &ResolveInfo<'_>) {
self.0.iter_mut().for_each(|e| e.resolve_end(resolve_id));
}
fn result(&self) -> Option<Value> {
fn error(&mut self, err: &Error) {
self.0.iter_mut().for_each(|e| e.error(err));
}
fn result(&mut self) -> Option<Value> {
if !self.0.is_empty() {
let value = self
.0
.iter()
.iter_mut()
.filter_map(|e| {
if let Some(name) = e.name() {
e.result().map(|res| (name.to_string(), res))

View File

@ -1,53 +1,38 @@
use crate::extensions::{Extension, ResolveInfo};
use crate::QueryPathSegment;
use parking_lot::Mutex;
use crate::{QueryPathSegment, Variables};
use std::collections::BTreeMap;
use tracing::{span, Id, Level};
#[derive(Default)]
struct Inner {
root_id: Option<Id>,
fields: BTreeMap<usize, Id>,
}
/// Tracing extension
///
/// # References
///
/// https://crates.io/crates/tracing
pub struct Tracing {
inner: Mutex<Inner>,
}
impl Default for Tracing {
fn default() -> Self {
Self {
inner: Default::default(),
}
}
root_id: Option<Id>,
fields: BTreeMap<usize, Id>,
}
impl Extension for Tracing {
fn parse_start(&self, query_source: &str) {
fn parse_start(&mut self, query_source: &str, _variables: &Variables) {
let root_span = span!(target: "async-graphql", parent:None, Level::INFO, "query", source = query_source);
if let Some(id) = root_span.id() {
tracing::dispatcher::get_default(|d| d.enter(&id));
self.inner.lock().root_id.replace(id);
self.root_id.replace(id);
}
}
fn execution_end(&self) {
if let Some(id) = self.inner.lock().root_id.take() {
fn execution_end(&mut self) {
if let Some(id) = self.root_id.take() {
tracing::dispatcher::get_default(|d| d.exit(&id));
}
}
fn resolve_start(&self, info: &ResolveInfo<'_>) {
let mut inner = self.inner.lock();
fn resolve_start(&mut self, info: &ResolveInfo<'_>) {
let parent_span = info
.resolve_id
.parent
.and_then(|id| inner.fields.get(&id))
.and_then(|id| self.fields.get(&id))
.cloned();
let span = match &info.path_node.segment {
QueryPathSegment::Index(idx) => span!(
@ -71,12 +56,12 @@ impl Extension for Tracing {
};
if let Some(id) = span.id() {
tracing::dispatcher::get_default(|d| d.enter(&id));
inner.fields.insert(info.resolve_id.current, id);
self.fields.insert(info.resolve_id.current, id);
}
}
fn resolve_end(&self, info: &ResolveInfo<'_>) {
if let Some(id) = self.inner.lock().fields.remove(&info.resolve_id.current) {
fn resolve_end(&mut self, info: &ResolveInfo<'_>) {
if let Some(id) = self.fields.remove(&info.resolve_id.current) {
tracing::dispatcher::get_default(|d| d.exit(&id));
}
}

View File

@ -1,4 +1,4 @@
use crate::extensions::{Extension, ResolveInfo};
use crate::extensions::{ErrorLogger, Extension, ResolveInfo};
use crate::parser::query::{Selection, TypeCondition};
use crate::{ContextSelectionSet, Error, ObjectType, QueryError, Result};
use std::future::Future;
@ -76,14 +76,22 @@ fn do_resolve<'a, T: ObjectType + Send + Sync>(
},
};
ctx_field.query_env.extensions.resolve_start(&resolve_info);
let value = ctx_field
ctx_field
.query_env
.extensions
.log_error(root.resolve_field(&ctx_field).await)?;
.lock()
.resolve_start(&resolve_info);
let value = root
.resolve_field(&ctx_field)
.await
.log_error(&ctx.query_env.extensions)?;
values.insert(field_name, value);
ctx_field.query_env.extensions.resolve_end(&resolve_info);
ctx_field
.query_env
.extensions
.lock()
.resolve_end(&resolve_info);
}
Selection::FragmentSpread(fragment_spread) => {
if ctx.is_skip(&fragment_spread.directives)? {

View File

@ -1,6 +1,6 @@
use crate::context::{Data, DeferList, ResolveId};
use crate::error::ParseRequestError;
use crate::extensions::{BoxExtension, Extension};
use crate::extensions::{BoxExtension, ErrorLogger, Extension};
use crate::mutation_resolver::do_mutation_resolve;
use crate::registry::CacheControl;
use crate::{
@ -272,12 +272,12 @@ impl QueryBuilder {
Subscription: SubscriptionType + Send + Sync + 'static,
{
let (mut document, cache_control, extensions) =
schema.prepare_query(&self.query_source, &self.extensions)?;
schema.prepare_query(&self.query_source, &self.variables, &self.extensions)?;
// execute
let inc_resolve_id = AtomicUsize::default();
if !document.retain_operation(self.operation_name.as_deref()) {
return extensions.log_error(if let Some(operation_name) = self.operation_name {
return if let Some(operation_name) = self.operation_name {
Err(Error::Query {
pos: Pos::default(),
path: None,
@ -291,7 +291,8 @@ impl QueryBuilder {
path: None,
err: QueryError::MissingOperation,
})
});
}
.log_error(&extensions);
}
let env = QueryEnv::new(
@ -314,8 +315,7 @@ impl QueryBuilder {
defer_list: Some(&defer_list),
};
env.extensions.execution_start();
env.extensions.lock().execution_start();
let data = match &env.document.current_operation().ty {
OperationType::Query => do_resolve(&ctx, &schema.query).await?,
OperationType::Mutation => do_mutation_resolve(&ctx, &schema.mutation).await?,
@ -328,13 +328,12 @@ impl QueryBuilder {
}
};
env.extensions.execution_end();
env.extensions.lock().execution_end();
let res = QueryResponse {
label: None,
path: None,
data,
extensions: env.extensions.result(),
extensions: env.extensions.lock().result(),
cache_control,
};
Ok((res, defer_list))

View File

@ -1,5 +1,5 @@
use crate::base::BoxFieldFuture;
use crate::extensions::{Extension, ResolveInfo};
use crate::extensions::{ErrorLogger, Extension, ResolveInfo};
use crate::parser::query::{Selection, TypeCondition};
use crate::{ContextSelectionSet, Error, ObjectType, QueryError, Result};
use futures::{future, TryFutureExt};
@ -95,15 +95,23 @@ pub fn collect_fields<'a, T: ObjectType + Send + Sync>(
},
};
ctx_field.query_env.extensions.resolve_start(&resolve_info);
ctx_field
.query_env
.extensions
.lock()
.resolve_start(&resolve_info);
let res = ctx_field.query_env.extensions.log_error(
root.resolve_field(&ctx_field)
.map_ok(move |value| (field_name, value))
.await,
)?;
let res = root
.resolve_field(&ctx_field)
.map_ok(move |value| (field_name, value))
.await
.log_error(&ctx_field.query_env.extensions)?;
ctx_field.query_env.extensions.resolve_end(&resolve_info);
ctx_field
.query_env
.extensions
.lock()
.resolve_end(&resolve_info);
Ok(res)
}
}))

View File

@ -1,5 +1,5 @@
use crate::context::Data;
use crate::extensions::{BoxExtension, Extension, Extensions};
use crate::extensions::{BoxExtension, ErrorLogger, Extension, Extensions};
use crate::model::__DirectiveLocation;
use crate::parser::parse_query;
use crate::query::{QueryBuilder, StreamResponse};
@ -311,46 +311,46 @@ where
pub(crate) fn prepare_query(
&self,
source: &str,
variables: &Variables,
query_extensions: &[Box<dyn Fn() -> BoxExtension + Send + Sync>],
) -> Result<(Document, CacheControl, Extensions)> {
) -> Result<(Document, CacheControl, spin::Mutex<Extensions>)> {
// create extension instances
let extensions = Extensions(
let extensions = spin::Mutex::new(Extensions(
self.0
.extensions
.iter()
.chain(query_extensions)
.map(|factory| factory())
.collect_vec(),
);
));
extensions.parse_start(source);
let document = extensions.log_error(parse_query(source).map_err(Into::<Error>::into))?;
extensions.parse_end(source, &document);
extensions.lock().parse_start(source, &variables);
let document = parse_query(source)
.map_err(Into::<Error>::into)
.log_error(&extensions)?;
extensions.lock().parse_end(&document);
// check rules
extensions.validation_start();
extensions.lock().validation_start();
let CheckResult {
cache_control,
complexity,
depth,
} = extensions.log_error(check_rules(
&self.env.registry,
&document,
self.validation_mode,
))?;
extensions.validation_end();
} = check_rules(&self.env.registry, &document, self.validation_mode)
.log_error(&extensions)?;
extensions.lock().validation_end();
// check limit
if let Some(limit_complexity) = self.complexity {
if complexity > limit_complexity {
return extensions
.log_error(Err(QueryError::TooComplex.into_error(Pos::default())));
return Err(QueryError::TooComplex.into_error(Pos::default()))
.log_error(&extensions);
}
}
if let Some(limit_depth) = self.depth {
if depth > limit_depth {
return extensions.log_error(Err(QueryError::TooDeep.into_error(Pos::default())));
return Err(QueryError::TooDeep.into_error(Pos::default())).log_error(&extensions);
}
}
@ -365,21 +365,22 @@ where
variables: Variables,
ctx_data: Option<Arc<Data>>,
) -> Result<impl Stream<Item = Result<serde_json::Value>> + Send> {
let (mut document, _, extensions) = self.prepare_query(source, &Vec::new())?;
let (mut document, _, extensions) = self.prepare_query(source, &variables, &Vec::new())?;
if !document.retain_operation(operation_name) {
return extensions.log_error(if let Some(name) = operation_name {
return if let Some(name) = operation_name {
Err(QueryError::UnknownOperationNamed {
name: name.to_string(),
}
.into_error(Pos::default()))
} else {
Err(QueryError::MissingOperation.into_error(Pos::default()))
});
}
.log_error(&extensions);
}
if document.current_operation().ty != OperationType::Subscription {
return extensions.log_error(Err(QueryError::NotSupported.into_error(Pos::default())));
return Err(QueryError::NotSupported.into_error(Pos::default())).log_error(&extensions);
}
let resolve_id = AtomicUsize::default();
@ -397,9 +398,9 @@ where
None,
);
let mut streams = Vec::new();
ctx.query_env
.extensions
.log_error(create_subscription_stream(self, env.clone(), &ctx, &mut streams).await)?;
create_subscription_stream(self, env.clone(), &ctx, &mut streams)
.await
.log_error(&ctx.query_env.extensions)?;
Ok(futures::stream::select_all(streams))
}