From 7fb02d587ed295009509b61227bb535b3884a5c4 Mon Sep 17 00:00:00 2001 From: Sunli Date: Sat, 20 Mar 2021 19:42:00 +0800 Subject: [PATCH] Add `extension::OpenTelemetry`. --- CHANGELOG.md | 4 + Cargo.toml | 2 + src/extensions/mod.rs | 4 + src/extensions/opentelemetry.rs | 214 ++++++++++++++++++++++++++++++++ src/extensions/tracing.rs | 163 ++++++++++-------------- 5 files changed, 291 insertions(+), 96 deletions(-) create mode 100644 src/extensions/opentelemetry.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9241308e..1051c81d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +- Add `extension::OpenTelemetry` and delete `extension::Tracing`. + ## [2.6.2] - 2021-03-20 - Add `SchemaBuilder::enable_subscription_in_federation` method. [#449](https://github.com/async-graphql/async-graphql/issues/449) diff --git a/Cargo.toml b/Cargo.toml index 7e14ff96..dee4df7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ default = [ "multipart", "string_number", "tracing", + "opentelemetry", "unblock", "url", "uuid", @@ -63,6 +64,7 @@ chrono = { version = "0.4.15", optional = true } chrono-tz = { version = "0.5.1", optional = true } log = { version = "0.4.11", optional = true } tracing = { version = "0.1.21", optional = true } +opentelemetry = { version = "0.12.0", optional = true } url = { version = "2.1.1", optional = true } uuid = { version = "0.8.1", optional = true, features = ["v4", "serde"] } diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index 6886b3fb..b23d06cb 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -7,6 +7,8 @@ pub mod apollo_persisted_queries; mod apollo_tracing; #[cfg(feature = "log")] mod logger; +#[cfg(feature = "opentelemetry")] +mod opentelemetry; #[cfg(feature = "tracing")] mod tracing; @@ -23,6 +25,8 @@ pub use self::analyzer::Analyzer; pub use self::apollo_tracing::ApolloTracing; #[cfg(feature = "log")] pub use self::logger::Logger; +#[cfg(feature = "opentelemetry")] +pub use self::opentelemetry::OpenTelemetry; #[cfg(feature = "tracing")] pub use self::tracing::{Tracing, TracingConfig}; diff --git a/src/extensions/opentelemetry.rs b/src/extensions/opentelemetry.rs new file mode 100644 index 00000000..5541615d --- /dev/null +++ b/src/extensions/opentelemetry.rs @@ -0,0 +1,214 @@ +use std::collections::HashMap; + +use async_graphql_parser::types::ExecutableDocument; +use async_graphql_value::Variables; +use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer}; +use opentelemetry::{Context as OpenTelemetryContext, Key, KeyValue}; + +use crate::extensions::{Extension, ExtensionContext, ExtensionFactory, ResolveInfo}; +use crate::{ServerError, ValidationResult}; + +const REQUEST_CTX: usize = 0; +const PARSE_CTX: usize = 1; +const VALIDATION_CTX: usize = 2; +const EXECUTE_CTX: usize = 3; + +#[inline] +fn resolve_ctx_id(resolver_id: usize) -> usize { + resolver_id + 10 +} + +const KEY_SOURCE: Key = Key::from_static_str("graphql.source"); +const KEY_VARIABLES: Key = Key::from_static_str("graphql.variables"); +const KEY_PARENT_TYPE: Key = Key::from_static_str("graphql.parentType"); +const KEY_RETURN_TYPE: Key = Key::from_static_str("graphql.returnType"); +const KEY_RESOLVE_ID: Key = Key::from_static_str("graphql.resolveId"); +const KEY_ERROR: Key = Key::from_static_str("graphql.error"); +const KEY_COMPLEXITY: Key = Key::from_static_str("graphql.complexity"); +const KEY_DEPTH: Key = Key::from_static_str("graphql.depth"); + +/// OpenTelemetry extension +#[derive(Default)] +#[cfg_attr(feature = "nightly", doc(cfg(feature = "opentelemetry")))] +pub struct OpenTelemetry { + tracer: T, + #[allow(dead_code)] + uninstall: U, +} + +impl OpenTelemetry { + /// Use `tracer` to create an OpenTelemetry extension. + pub fn new(tracer: T) -> OpenTelemetry + where + T: Tracer + Send + Sync + Clone, + { + Self { + tracer, + uninstall: (), + } + } + + /// Attach an Uninstall instance to this extension. + pub fn with_uninstall(self, uninstall: U) -> OpenTelemetry + where + U: Send + Sync + 'static, + { + OpenTelemetry { + tracer: self.tracer, + uninstall, + } + } +} + +impl ExtensionFactory + for OpenTelemetry +{ + fn create(&self) -> Box { + Box::new(OpenTelemetryExtension { + tracer: self.tracer.clone(), + contexts: Default::default(), + }) + } +} + +struct OpenTelemetryExtension { + tracer: T, + contexts: HashMap, +} + +impl OpenTelemetryExtension { + fn enter_context(&mut self, id: usize, cx: OpenTelemetryContext) { + let _ = cx.clone().attach(); + self.contexts.insert(id, cx); + } + + fn exit_context(&mut self, id: usize) -> Option { + if let Some(cx) = self.contexts.remove(&id) { + let _ = cx.clone().attach(); + Some(cx) + } else { + None + } + } +} + +impl Extension for OpenTelemetryExtension { + fn parse_start( + &mut self, + _ctx: &ExtensionContext<'_>, + query_source: &str, + variables: &Variables, + ) { + let request_span = self + .tracer + .span_builder("request") + .with_kind(SpanKind::Server) + .start(&self.tracer); + let request_cx = OpenTelemetryContext::current_with_span(request_span); + self.enter_context(REQUEST_CTX, request_cx.clone()); + + let mut attributes = Vec::with_capacity(2); + attributes.push(KeyValue::new(KEY_SOURCE, query_source.to_string())); + attributes.push(KeyValue::new( + KEY_VARIABLES, + serde_json::to_string(variables).unwrap(), + )); + let parse_span = self + .tracer + .span_builder("parse") + .with_kind(SpanKind::Server) + .with_attributes(attributes) + .with_parent_context(request_cx) + .start(&self.tracer); + let parse_cx = OpenTelemetryContext::current_with_span(parse_span); + self.enter_context(PARSE_CTX, parse_cx); + } + + fn parse_end(&mut self, _ctx: &ExtensionContext<'_>, _document: &ExecutableDocument) { + self.exit_context(PARSE_CTX); + } + + fn validation_start(&mut self, _ctx: &ExtensionContext<'_>) { + if let Some(parent_cx) = self.contexts.get(&REQUEST_CTX).cloned() { + let span = self + .tracer + .span_builder("validation") + .with_kind(SpanKind::Server) + .with_parent_context(parent_cx) + .start(&self.tracer); + let validation_cx = OpenTelemetryContext::current_with_span(span); + self.enter_context(VALIDATION_CTX, validation_cx); + } + } + + fn validation_end(&mut self, _ctx: &ExtensionContext<'_>, result: &ValidationResult) { + if let Some(validation_cx) = self.exit_context(VALIDATION_CTX) { + let span = validation_cx.span(); + span.set_attribute(KeyValue::new(KEY_COMPLEXITY, result.complexity as i64)); + span.set_attribute(KeyValue::new(KEY_DEPTH, result.depth as i64)); + } + } + + fn execution_start(&mut self, _ctx: &ExtensionContext<'_>) { + let span = match self.contexts.get(&REQUEST_CTX).cloned() { + Some(parent_cx) => self + .tracer + .span_builder("execute") + .with_kind(SpanKind::Server) + .with_parent_context(parent_cx) + .start(&self.tracer), + None => self + .tracer + .span_builder("execute") + .with_kind(SpanKind::Server) + .start(&self.tracer), + }; + let execute_cx = OpenTelemetryContext::current_with_span(span); + self.enter_context(EXECUTE_CTX, execute_cx); + } + + fn execution_end(&mut self, _ctx: &ExtensionContext<'_>) { + self.exit_context(EXECUTE_CTX); + self.exit_context(REQUEST_CTX); + } + + fn resolve_start(&mut self, _ctx: &ExtensionContext<'_>, info: &ResolveInfo<'_>) { + let parent_cx = match info.resolve_id.parent { + Some(parent_id) if parent_id > 0 => self.contexts.get(&resolve_ctx_id(parent_id)), + _ => self.contexts.get(&EXECUTE_CTX), + } + .cloned(); + + if let Some(parent_cx) = parent_cx { + let mut attributes = Vec::with_capacity(3); + attributes.push(KeyValue::new( + KEY_RESOLVE_ID, + info.resolve_id.current as i64, + )); + attributes.push(KeyValue::new(KEY_PARENT_TYPE, info.parent_type.to_string())); + attributes.push(KeyValue::new(KEY_RETURN_TYPE, info.return_type.to_string())); + let span = self + .tracer + .span_builder(&info.path_node.to_string()) + .with_kind(SpanKind::Server) + .with_parent_context(parent_cx) + .with_attributes(attributes) + .start(&self.tracer); + let resolve_cx = OpenTelemetryContext::current_with_span(span); + self.enter_context(resolve_ctx_id(info.resolve_id.current), resolve_cx); + } + } + + fn resolve_end(&mut self, _ctx: &ExtensionContext<'_>, info: &ResolveInfo<'_>) { + self.exit_context(resolve_ctx_id(info.resolve_id.current)); + } + + fn error(&mut self, _ctx: &ExtensionContext<'_>, err: &ServerError) { + if let Some(parent_cx) = self.contexts.get(&EXECUTE_CTX).cloned() { + parent_cx.span().add_event( + "error".to_string(), + vec![KeyValue::new(KEY_ERROR, err.to_string())], + ); + } + } +} diff --git a/src/extensions/tracing.rs b/src/extensions/tracing.rs index 874e09de..518e568f 100644 --- a/src/extensions/tracing.rs +++ b/src/extensions/tracing.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::collections::HashMap; use tracing::{span, Level, Span}; @@ -22,6 +22,16 @@ impl TracingConfig { } } +const REQUEST_CTX: usize = 0; +const PARSE_CTX: usize = 1; +const VALIDATION_CTX: usize = 2; +const EXECUTE_CTX: usize = 3; + +#[inline] +fn resolve_span_id(resolver_id: usize) -> usize { + resolver_id + 10 +} + /// Tracing extension /// /// # References @@ -68,11 +78,21 @@ impl ExtensionFactory for Tracing { #[derive(Default)] struct TracingExtension { - root: Option, - parse: Option, - validation: Option, - execute: Option, - fields: BTreeMap, + spans: HashMap, +} + +impl TracingExtension { + fn enter_span(&mut self, id: usize, span: Span) -> &Span { + let _ = span.enter(); + self.spans.insert(id, span); + self.spans.get(&id).unwrap() + } + + fn exit_span(&mut self, id: usize) { + if let Some(span) = self.spans.remove(&id) { + let _ = span.enter(); + } + } } impl Extension for TracingExtension { @@ -80,109 +100,91 @@ impl Extension for TracingExtension { &mut self, ctx: &ExtensionContext<'_>, query_source: &str, - _variables: &Variables, + variables: &Variables, ) { - let parent_span = ctx + let request_span = ctx .data_opt::() - .and_then(|cfg| cfg.parent.as_ref()); - - let root_span = match parent_span { - Some(parent) => span!( - target: "async_graphql::graphql", - parent: parent, - Level::INFO, - "query", - source = %query_source - ), - None => span!( - target: "async_graphql::graphql", - parent: None, - Level::INFO, - "query", - source = %query_source - ), - }; + .and_then(|cfg| cfg.parent.as_ref()) + .cloned() + .unwrap_or_else(|| { + span!( + target: "async_graphql::graphql", + parent: None, + Level::INFO, + "request", + ) + }); + let variables = serde_json::to_string(&variables).unwrap(); let parse_span = span!( target: "async_graphql::graphql", - parent: &root_span, + parent: &request_span, Level::INFO, - "parse" + "parse", + source = query_source, + variables = %variables, ); - enter_span(&root_span); - self.root.replace(root_span); - - enter_span(&parse_span); - self.parse.replace(parse_span); + self.enter_span(REQUEST_CTX, request_span); + self.enter_span(PARSE_CTX, parse_span); } fn parse_end(&mut self, _ctx: &ExtensionContext<'_>, _document: &ExecutableDocument) { - if let Some(span) = self.parse.take() { - exit_span(span); - } + self.exit_span(PARSE_CTX); } fn validation_start(&mut self, _ctx: &ExtensionContext<'_>) { - if let Some(parent) = &self.root { - let validation_span = span!( + if let Some(parent) = self.spans.get(&REQUEST_CTX) { + let span = span!( target: "async_graphql::graphql", parent: parent, Level::INFO, "validation" ); - enter_span(&validation_span); - self.validation.replace(validation_span); + self.enter_span(VALIDATION_CTX, span); } } fn validation_end(&mut self, _ctx: &ExtensionContext<'_>, _result: &ValidationResult) { - if let Some(span) = self.validation.take() { - exit_span(span); - } + self.exit_span(VALIDATION_CTX); } fn execution_start(&mut self, _ctx: &ExtensionContext<'_>) { - let execute_span = if let Some(parent) = &self.root { - span!( + let span = match self.spans.get(&REQUEST_CTX) { + Some(parent) => span!( target: "async_graphql::graphql", parent: parent, Level::INFO, "execute" - ) - } else { - // For every step of the subscription stream. - span!( + ), + None => span!( target: "async_graphql::graphql", parent: None, Level::INFO, "execute" - ) + ), }; - enter_span(&execute_span); - self.execute.replace(execute_span); + self.enter_span(EXECUTE_CTX, span); } - fn execution_end(&mut self, _ctx: &ExtensionContext<'_>) { - if let Some(span) = self.execute.take() { - exit_span(span); - } - if let Some(span) = self.root.take() { - exit_span(span); + fn execution_end(&mut self, ctx: &ExtensionContext<'_>) { + self.exit_span(EXECUTE_CTX); + if ctx.data_opt::().is_some() { + self.exit_span(REQUEST_CTX); } } fn resolve_start(&mut self, _ctx: &ExtensionContext<'_>, info: &ResolveInfo<'_>) { - let parent_span = match info.resolve_id.parent { - Some(parent_id) if parent_id > 0 => self.fields.get(&parent_id), - _ => self.execute.as_ref(), + let parent = match info.resolve_id.parent { + Some(parent_id) if parent_id > 0 => self.spans.get(&resolve_span_id(parent_id)), + _ => self.spans.get(&EXECUTE_CTX), }; - if let Some(parent_span) = parent_span { + if let Some(parent) = parent { let span = span!( target: "async_graphql::graphql", - parent: parent_span, + parent: parent, Level::INFO, "field", id = %info.resolve_id.current, @@ -190,46 +192,15 @@ impl Extension for TracingExtension { parent_type = %info.parent_type, return_type = %info.return_type, ); - enter_span(&span); - self.fields.insert(info.resolve_id.current, span); + self.enter_span(resolve_span_id(info.resolve_id.current), span); } } fn resolve_end(&mut self, _ctx: &ExtensionContext<'_>, info: &ResolveInfo<'_>) { - if let Some(span) = self.fields.remove(&info.resolve_id.current) { - exit_span(span); - } + self.exit_span(resolve_span_id(info.resolve_id.current)); } fn error(&mut self, _ctx: &ExtensionContext<'_>, err: &ServerError) { tracing::error!(target: "async_graphql::graphql", error = %err.message); - - for (_, span) in std::mem::take(&mut self.fields) { - exit_span(span); - } - self.fields.clear(); - - if let Some(span) = self.execute.take() { - exit_span(span); - } - if let Some(span) = self.validation.take() { - exit_span(span); - } - if let Some(span) = self.parse.take() { - exit_span(span); - } - if let Some(span) = self.root.take() { - exit_span(span); - } } } - -#[inline] -fn enter_span(span: &Span) { - let _enter = span.enter(); -} - -#[inline] -fn exit_span(span: Span) { - let _enter = span.enter(); -}