From 3d7594bac14fe5c0ef832bdd31c05cbeecbd65c7 Mon Sep 17 00:00:00 2001 From: Sunli Date: Fri, 31 Jul 2020 10:10:03 +0800 Subject: [PATCH] Removes code about streaming requests. --- async-graphql-actix-web/src/lib.rs | 32 +--- async-graphql-derive/src/subscription.rs | 1 - async-graphql-tide/src/lib.rs | 35 +---- async-graphql-warp/src/lib.rs | 35 +---- feature-comparison.md | 1 - src/context.rs | 31 +--- src/http/mod.rs | 10 -- src/lib.rs | 8 +- src/query.rs | 184 +---------------------- src/schema.rs | 24 +-- src/subscription/ws_transport.rs | 2 - src/types/deferred.rs | 99 ------------ src/types/mod.rs | 4 - src/types/streamed.rs | 111 -------------- tests/defer.rs | 152 ------------------- 15 files changed, 16 insertions(+), 713 deletions(-) delete mode 100644 src/types/deferred.rs delete mode 100644 src/types/streamed.rs delete mode 100644 tests/defer.rs diff --git a/async-graphql-actix-web/src/lib.rs b/async-graphql-actix-web/src/lib.rs index e0f17662..41d911ec 100644 --- a/async-graphql-actix-web/src/lib.rs +++ b/async-graphql-actix-web/src/lib.rs @@ -5,20 +5,17 @@ mod subscription; -use actix_web::body::BodyStream; use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream}; use actix_web::http::StatusCode; use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder}; -use async_graphql::http::{multipart_stream, StreamBody}; +use async_graphql::http::StreamBody; use async_graphql::{ IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder, QueryResponse, - StreamResponse, }; use futures::channel::mpsc; use futures::future::Ready; use futures::{Future, SinkExt, StreamExt, TryFutureExt}; use http::Method; -use std::convert::Infallible; use std::pin::Pin; pub use subscription::WSSubscription; @@ -112,33 +109,6 @@ impl Responder for GQLResponse { } } -/// Responder for GraphQL response stream -pub struct GQLResponseStream(StreamResponse); - -impl From for GQLResponseStream { - fn from(resp: StreamResponse) -> Self { - GQLResponseStream(resp) - } -} - -impl Responder for GQLResponseStream { - type Error = Error; - type Future = Ready>; - - fn respond_to(self, req: &HttpRequest) -> Self::Future { - match self.0 { - StreamResponse::Single(resp) => GQLResponse(resp).respond_to(req), - StreamResponse::Stream(stream) => { - let body = - BodyStream::new(multipart_stream(stream).map(Result::<_, Infallible>::Ok)); - let mut res = HttpResponse::build(StatusCode::OK); - res.content_type("multipart/mixed; boundary=\"-\""); - futures::future::ok(res.body(body)) - } - } - } -} - fn add_cache_control( builder: &mut HttpResponseBuilder, resp: &async_graphql::Result, diff --git a/async-graphql-derive/src/subscription.rs b/async-graphql-derive/src/subscription.rs index 478cebcb..c7dc8878 100644 --- a/async-graphql-derive/src/subscription.rs +++ b/async-graphql-derive/src/subscription.rs @@ -274,7 +274,6 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result< }), &field.selection_set, &resolve_id, - None, ); #crate_name::OutputValueType::resolve(&msg, &ctx_selection_set, &*field).await } diff --git a/async-graphql-tide/src/lib.rs b/async-graphql-tide/src/lib.rs index d3f7c49e..bb0d7d23 100644 --- a/async-graphql-tide/src/lib.rs +++ b/async-graphql-tide/src/lib.rs @@ -5,15 +5,12 @@ #![allow(clippy::needless_doctest_main)] #![forbid(unsafe_code)] -use async_graphql::http::{multipart_stream, GQLRequest, GQLResponse, StreamBody}; +use async_graphql::http::{GQLRequest, GQLResponse}; use async_graphql::{ IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, QueryResponse, Schema, - StreamResponse, SubscriptionType, + SubscriptionType, }; use async_trait::async_trait; -use futures::channel::mpsc; -use futures::io::BufReader; -use futures::{SinkExt, StreamExt}; use std::str::FromStr; use tide::{ http::{headers, Method}, @@ -127,9 +124,6 @@ impl RequestExt for Request pub trait ResponseExt: Sized { /// Set body as the result of a GraphQL query. fn body_graphql(self, res: async_graphql::Result) -> tide::Result; - - /// Set body as the result of a GraphQL streaming query. - fn body_graphql_stream(self, res: StreamResponse) -> tide::Result; } impl ResponseExt for Response { @@ -138,31 +132,6 @@ impl ResponseExt for Response { resp.set_body(Body::from_json(&GQLResponse(res))?); Ok(resp) } - - fn body_graphql_stream(mut self, res: StreamResponse) -> tide::Result { - match res { - StreamResponse::Single(res) => self.body_graphql(res), - StreamResponse::Stream(stream) => { - // Body::from_reader required Sync, however StreamResponse does not have Sync. - // I created an issue and got a reply that this might be fixed in the future. - // https://github.com/http-rs/http-types/pull/144 - // Now I can only use forwarding to solve the problem. - let mut stream = - Box::pin(multipart_stream(stream).map(Result::Ok::<_, std::io::Error>)); - let (mut tx, rx) = mpsc::channel(0); - async_std::task::spawn(async move { - while let Some(item) = stream.next().await { - if tx.send(item).await.is_err() { - return; - } - } - }); - self.set_body(Body::from_reader(BufReader::new(StreamBody::new(rx)), None)); - self.insert_header(tide::http::headers::CONTENT_TYPE, "multipart/mixed"); - Ok(self) - } - } - } } fn add_cache_control( diff --git a/async-graphql-warp/src/lib.rs b/async-graphql-warp/src/lib.rs index f6075867..e52310c5 100644 --- a/async-graphql-warp/src/lib.rs +++ b/async-graphql-warp/src/lib.rs @@ -5,17 +5,15 @@ #![allow(clippy::needless_doctest_main)] #![forbid(unsafe_code)] -use async_graphql::http::{multipart_stream, GQLRequest, StreamBody}; +use async_graphql::http::{GQLRequest, StreamBody}; use async_graphql::{ Data, FieldResult, IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, - QueryResponse, Schema, StreamResponse, SubscriptionType, WebSocketTransport, + QueryResponse, Schema, SubscriptionType, WebSocketTransport, }; use bytes::Bytes; use futures::select; use futures::{SinkExt, StreamExt}; -use hyper::header::HeaderValue; -use hyper::{Body, Method}; -use std::convert::Infallible; +use hyper::Method; use std::sync::Arc; use warp::filters::ws::Message; use warp::filters::BoxedFilter; @@ -312,30 +310,3 @@ impl Reply for GQLResponse { resp } } - -/// GraphQL streaming reply -pub struct GQLResponseStream(StreamResponse); - -impl From for GQLResponseStream { - fn from(resp: StreamResponse) -> Self { - GQLResponseStream(resp) - } -} - -impl Reply for GQLResponseStream { - fn into_response(self) -> Response { - match self.0 { - StreamResponse::Single(resp) => GQLResponse(resp).into_response(), - StreamResponse::Stream(stream) => { - let mut resp = Response::new(Body::wrap_stream( - multipart_stream(stream).map(Result::<_, Infallible>::Ok), - )); - resp.headers_mut().insert( - "content-type", - HeaderValue::from_static("multipart/mixed; boundary=\"-\""), - ); - resp - } - } - } -} diff --git a/feature-comparison.md b/feature-comparison.md index 87bf2412..7d1719dc 100644 --- a/feature-comparison.md +++ b/feature-comparison.md @@ -24,7 +24,6 @@ Comparing Features of Other Rust GraphQL Implementations | Field guard | 👍 | ⛔️ | | Multipart request(upload file) | 👍 | ⛔️ | | Subscription | 👍 | ⛔️ | -| @defer/@stream | 👍 | ⛔️ | | Opentracing | 👍 | ⛔️ | | Apollo Federation | 👍 | ⛔️ | | Apollo Tracing | 👍 | ⛔️ | diff --git a/src/context.rs b/src/context.rs index 54fabf9f..136a3e2c 100644 --- a/src/context.rs +++ b/src/context.rs @@ -2,14 +2,11 @@ use crate::extensions::Extensions; use crate::parser::query::{Directive, Field, SelectionSet}; use crate::schema::SchemaEnv; use crate::{ - FieldResult, InputValueType, Lookahead, Pos, Positioned, QueryError, QueryResponse, Result, - Type, Value, + FieldResult, InputValueType, Lookahead, Pos, Positioned, QueryError, Result, Type, Value, }; use async_graphql_parser::query::Document; use async_graphql_parser::UploadValue; use fnv::FnvHashMap; -use futures::Future; -use parking_lot::Mutex; use serde::ser::SerializeSeq; use serde::Serializer; use std::any::{Any, TypeId}; @@ -17,7 +14,6 @@ use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; use std::fs::File; use std::ops::{Deref, DerefMut}; -use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -256,25 +252,6 @@ impl std::fmt::Display for ResolveId { } } -#[doc(hidden)] -pub type BoxDeferFuture = - Pin> + Send + 'static>>; - -#[doc(hidden)] -pub struct DeferList { - pub path_prefix: Vec, - pub futures: Mutex>, -} - -impl DeferList { - pub(crate) fn append(&self, fut: F) - where - F: Future> + Send + 'static, - { - self.futures.lock().push(Box::pin(fut)); - } -} - /// Query context #[derive(Clone)] pub struct ContextBase<'a, T> { @@ -286,7 +263,6 @@ pub struct ContextBase<'a, T> { pub item: T, pub(crate) schema_env: &'a SchemaEnv, pub(crate) query_env: &'a QueryEnv, - pub(crate) defer_list: Option<&'a DeferList>, } impl<'a, T> Deref for ContextBase<'a, T> { @@ -340,7 +316,6 @@ impl QueryEnv { path_node: Option>, item: T, inc_resolve_id: &'a AtomicUsize, - defer_list: Option<&'a DeferList>, ) -> ContextBase<'a, T> { ContextBase { path_node, @@ -349,7 +324,6 @@ impl QueryEnv { item, schema_env, query_env: self, - defer_list, } } } @@ -387,7 +361,6 @@ impl<'a, T> ContextBase<'a, T> { inc_resolve_id: self.inc_resolve_id, schema_env: self.schema_env, query_env: self.query_env, - defer_list: self.defer_list, } } @@ -403,7 +376,6 @@ impl<'a, T> ContextBase<'a, T> { inc_resolve_id: &self.inc_resolve_id, schema_env: self.schema_env, query_env: self.query_env, - defer_list: self.defer_list, } } @@ -561,7 +533,6 @@ impl<'a> ContextBase<'a, &'a Positioned> { inc_resolve_id: self.inc_resolve_id, schema_env: self.schema_env, query_env: self.query_env, - defer_list: self.defer_list, } } } diff --git a/src/http/mod.rs b/src/http/mod.rs index bba0b8b8..c3bce133 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -61,14 +61,6 @@ impl Serialize for GQLResponse { match &self.0 { Ok(res) => { let mut map = serializer.serialize_map(None)?; - if let Some(label) = &res.label { - map.serialize_key("label")?; - map.serialize_value(label)?; - } - if let Some(path) = &res.path { - map.serialize_key("path")?; - map.serialize_value(path)?; - } map.serialize_key("data")?; map.serialize_value(&res.data)?; if res.extensions.is_some() { @@ -219,8 +211,6 @@ mod tests { #[test] fn test_response_data() { let resp = GQLResponse(Ok(QueryResponse { - label: None, - path: None, data: json!({"ok": true}), extensions: None, cache_control: Default::default(), diff --git a/src/lib.rs b/src/lib.rs index dcf60913..dcff7afc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -150,9 +150,7 @@ pub use error::{ }; pub use look_ahead::Lookahead; pub use parser::{Pos, Positioned, Value}; -pub use query::{ - IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse, StreamResponse, -}; +pub use query::{IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse}; pub use registry::CacheControl; pub use scalars::{Any, Json, OutputJson, ID}; pub use schema::{Schema, SchemaBuilder, SchemaEnv}; @@ -160,9 +158,7 @@ pub use serde_json::Number; pub use subscription::{ SimpleBroker, SubscriptionStreams, SubscriptionTransport, WebSocketTransport, }; -pub use types::{ - connection, Deferred, EmptyMutation, EmptySubscription, MaybeUndefined, Streamed, Upload, -}; +pub use types::{connection, EmptyMutation, EmptySubscription, MaybeUndefined, Upload}; pub use validation::ValidationMode; /// Result type diff --git a/src/query.rs b/src/query.rs index 91e12670..cd101392 100644 --- a/src/query.rs +++ b/src/query.rs @@ -1,4 +1,4 @@ -use crate::context::{Data, DeferList, ResolveId}; +use crate::context::{Data, ResolveId}; use crate::error::ParseRequestError; use crate::extensions::{BoxExtension, ErrorLogger, Extension}; use crate::mutation_resolver::do_mutation_resolve; @@ -8,12 +8,8 @@ use crate::{ SubscriptionType, Variables, }; use async_graphql_parser::query::OperationType; -use futures::{Stream, StreamExt}; -use itertools::Itertools; use std::any::Any; -use std::borrow::Cow; use std::fs::File; -use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -43,14 +39,6 @@ pub trait IntoQueryBuilder: Sized { /// Query response #[derive(Debug)] pub struct QueryResponse { - /// Label for RelayModernQueryExecutor - /// - /// https://github.com/facebook/relay/blob/2859aa8df4df7d4d6d9eef4c9dc1134286773710/packages/relay-runtime/store/RelayModernQueryExecutor.js#L1267 - pub label: Option, - - /// Path for subsequent response - pub path: Option>, - /// Data of query result pub data: serde_json::Value, @@ -61,80 +49,6 @@ pub struct QueryResponse { pub cache_control: CacheControl, } -impl QueryResponse { - pub(crate) fn apply_path_prefix(mut self, mut prefix: Vec) -> Self { - if let Some(path) = &mut self.path { - prefix.extend(path.drain(..)); - *path = prefix; - } else { - self.path = Some(prefix); - } - - self.label = self.path.as_ref().map(|path| { - path.iter() - .map(|value| { - if let serde_json::Value::String(s) = value { - Cow::Borrowed(s.as_str()) - } else { - Cow::Owned(value.to_string()) - } - }) - .join("$") - }); - - self - } - - pub(crate) fn merge(&mut self, resp: QueryResponse) { - let mut p = &mut self.data; - for item in resp.path.unwrap_or_default() { - match item { - serde_json::Value::String(name) => { - if let serde_json::Value::Object(obj) = p { - if let Some(next) = obj.get_mut(&name) { - p = next; - continue; - } - } - return; - } - serde_json::Value::Number(idx) => { - if let serde_json::Value::Array(array) = p { - let idx = idx.as_i64().unwrap() as usize; - while array.len() <= idx { - array.push(serde_json::Value::Null); - } - p = array.get_mut(idx as usize).unwrap(); - continue; - } - return; - } - _ => {} - } - } - *p = resp.data; - } -} - -/// Response for `Schema::execute_stream` and `QueryBuilder::execute_stream` -pub enum StreamResponse { - /// There is no `@defer` or `@stream` directive in the query, this is the final result. - Single(Result), - - /// Streaming responses. - Stream(Pin> + Send + 'static>>), -} - -impl StreamResponse { - /// Convert to a stream. - pub fn into_stream(self) -> impl Stream> + Send + 'static { - match self { - StreamResponse::Single(resp) => Box::pin(futures::stream::once(async move { resp })), - StreamResponse::Stream(stream) => stream, - } - } -} - /// Query builder pub struct QueryBuilder { pub(crate) query_source: String, @@ -205,67 +119,11 @@ impl QueryBuilder { .set_upload(var_path, filename, content_type, content); } - /// Execute the query, returns a stream, the first result being the query result, - /// followed by the incremental result. Only when there are `@defer` and `@stream` directives - /// in the query will there be subsequent incremental results. - pub async fn execute_stream( + /// Execute the query, always return a complete result. + pub async fn execute( self, schema: &Schema, - ) -> StreamResponse - where - Query: ObjectType + Send + Sync + 'static, - Mutation: ObjectType + Send + Sync + 'static, - Subscription: SubscriptionType + Send + Sync + 'static, - { - let schema = schema.clone(); - match self.execute_first(&schema).await { - Ok((first_resp, defer_list)) if defer_list.futures.lock().is_empty() => { - StreamResponse::Single(Ok(first_resp)) - } - Err(err) => StreamResponse::Single(Err(err)), - Ok((first_resp, defer_list)) => { - let stream = async_stream::try_stream! { - yield first_resp; - - let mut current_defer_list = Vec::new(); - for fut in defer_list.futures.into_inner() { - current_defer_list.push((defer_list.path_prefix.clone(), fut)); - } - - loop { - let mut next_defer_list = Vec::new(); - for (path_prefix, defer) in current_defer_list { - let (res, mut defer_list) = defer.await?; - for fut in defer_list.futures.into_inner() { - let mut next_path_prefix = path_prefix.clone(); - next_path_prefix.extend(defer_list.path_prefix.clone()); - next_defer_list.push((next_path_prefix, fut)); - } - let mut new_res = res.apply_path_prefix(path_prefix); - new_res.label = new_res.path.as_ref().map(|path| path.iter().map(|value| { - if let serde_json::Value::String(s) = value { - s.to_string() - } else { - value.to_string() - } - }).join("$")); - yield new_res; - } - if next_defer_list.is_empty() { - break; - } - current_defer_list = next_defer_list; - } - }; - StreamResponse::Stream(Box::pin(stream)) - } - } - } - - async fn execute_first<'a, Query, Mutation, Subscription>( - self, - schema: &Schema, - ) -> Result<(QueryResponse, DeferList)> + ) -> Result where Query: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static, @@ -301,10 +159,6 @@ impl QueryBuilder { document, Arc::new(self.ctx_data.unwrap_or_default()), ); - let defer_list = DeferList { - path_prefix: Vec::new(), - futures: Default::default(), - }; let ctx = ContextBase { path_node: None, resolve_id: ResolveId::root(), @@ -312,7 +166,6 @@ impl QueryBuilder { item: &env.document.current_operation().selection_set, schema_env: &schema.env, query_env: &env, - defer_list: Some(&defer_list), }; env.extensions.lock().execution_start(); @@ -329,37 +182,12 @@ impl QueryBuilder { }; env.extensions.lock().execution_end(); - let res = QueryResponse { - label: None, - path: None, + let resp = QueryResponse { data, extensions: env.extensions.lock().result(), cache_control, }; - Ok((res, defer_list)) - } - - /// Execute the query, always return a complete result. - pub async fn execute( - self, - schema: &Schema, - ) -> Result - where - Query: ObjectType + Send + Sync + 'static, - Mutation: ObjectType + Send + Sync + 'static, - Subscription: SubscriptionType + Send + Sync + 'static, - { - let resp = self.execute_stream(schema).await; - match resp { - StreamResponse::Single(res) => res, - StreamResponse::Stream(mut stream) => { - let mut resp = stream.next().await.unwrap()?; - while let Some(resp_part) = stream.next().await.transpose()? { - resp.merge(resp_part); - } - Ok(resp) - } - } + Ok(resp) } /// Get query source diff --git a/src/schema.rs b/src/schema.rs index 789a6f87..b76b93f5 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -2,7 +2,7 @@ use crate::context::Data; use crate::extensions::{BoxExtension, ErrorLogger, Extension, Extensions}; use crate::model::__DirectiveLocation; use crate::parser::parse_query; -use crate::query::{QueryBuilder, StreamResponse}; +use crate::query::QueryBuilder; use crate::registry::{MetaDirective, MetaInputValue, Registry}; use crate::subscription::{create_connection, create_subscription_stream, SubscriptionTransport}; use crate::types::QueryRoot; @@ -241,20 +241,6 @@ where } }); - registry.add_directive(MetaDirective { - name: "defer", - description: None, - locations: vec![__DirectiveLocation::FIELD], - args: Default::default(), - }); - - registry.add_directive(MetaDirective { - name: "stream", - description: None, - locations: vec![__DirectiveLocation::FIELD], - args: Default::default(), - }); - // register scalars bool::create_type_info(&mut registry); i32::create_type_info(&mut registry); @@ -301,13 +287,6 @@ where QueryBuilder::new(query_source).execute(self).await } - /// Execute the query without create the `QueryBuilder`, returns a stream, the first result being the query result, - /// followed by the incremental result. Only when there are `@defer` and `@stream` directives - /// in the query will there be subsequent incremental results. - pub async fn execute_stream(&self, query_source: &str) -> StreamResponse { - QueryBuilder::new(query_source).execute_stream(self).await - } - pub(crate) fn prepare_query( &self, source: &str, @@ -400,7 +379,6 @@ where None, &env.document.current_operation().selection_set, &resolve_id, - None, ); let mut streams = Vec::new(); create_subscription_stream(self, env.clone(), &ctx, &mut streams) diff --git a/src/subscription/ws_transport.rs b/src/subscription/ws_transport.rs index f4dbcf45..ee6291f2 100644 --- a/src/subscription/ws_transport.rs +++ b/src/subscription/ws_transport.rs @@ -141,8 +141,6 @@ impl SubscriptionTransport for WebSocketTransport { id: Some(id.clone()), payload: Some( serde_json::to_value(GQLResponse(Ok(QueryResponse { - label: None, - path: None, data: value, extensions: None, cache_control: Default::default(), diff --git a/src/types/deferred.rs b/src/types/deferred.rs deleted file mode 100644 index 8d0a616b..00000000 --- a/src/types/deferred.rs +++ /dev/null @@ -1,99 +0,0 @@ -use crate::context::DeferList; -use crate::registry::Registry; -use crate::{ContextSelectionSet, OutputValueType, Positioned, QueryResponse, Result, Type}; -use async_graphql_parser::query::Field; -use itertools::Itertools; -use parking_lot::Mutex; -use std::borrow::Cow; -use std::sync::atomic::AtomicUsize; - -/// Deferred type -/// -/// Allows to defer the type of results returned, only takes effect when the @defer directive exists on the field. -pub struct Deferred(Mutex>); - -impl From for Deferred { - fn from(value: T) -> Self { - Self(Mutex::new(Some(value))) - } -} - -impl Type for Deferred { - fn type_name() -> Cow<'static, str> { - T::type_name() - } - - fn create_type_info(registry: &mut Registry) -> String { - T::create_type_info(registry) - } -} - -#[async_trait::async_trait] -impl OutputValueType for Deferred { - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> Result { - let obj = self.0.lock().take(); - if let Some(obj) = obj { - if let Some(defer_list) = ctx.defer_list { - if ctx.is_defer(&field.directives) { - let schema_env = ctx.schema_env.clone(); - let query_env = ctx.query_env.clone(); - let mut field = field.clone(); - - // remove @defer directive - if let Some((idx, _)) = field - .node - .directives - .iter() - .find_position(|d| d.name.as_str() == "defer") - { - field.node.directives.remove(idx); - } - - let path_prefix = ctx - .path_node - .as_ref() - .map(|path| match serde_json::to_value(path) { - Ok(serde_json::Value::Array(values)) => values, - _ => Default::default(), - }) - .unwrap_or_default(); - - defer_list.append(async move { - let inc_resolve_id = AtomicUsize::default(); - let defer_list = DeferList { - path_prefix: path_prefix.clone(), - futures: Default::default(), - }; - let ctx = query_env.create_context( - &schema_env, - None, - &field.selection_set, - &inc_resolve_id, - Some(&defer_list), - ); - let data = obj.resolve(&ctx, &field).await?; - - Ok(( - QueryResponse { - label: None, - path: Some(path_prefix), - data, - extensions: None, - cache_control: Default::default(), - }, - defer_list, - )) - }); - return Ok(serde_json::Value::Null); - } - } - OutputValueType::resolve(&obj, ctx, field).await - } else { - Ok(serde_json::Value::Null) - } - } -} diff --git a/src/types/mod.rs b/src/types/mod.rs index 0e8811db..b35bd564 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,6 +1,5 @@ pub mod connection; -mod deferred; mod empty_mutation; mod empty_subscription; mod r#enum; @@ -8,14 +7,11 @@ mod list; mod maybe_undefined; mod optional; mod query_root; -mod streamed; mod upload; -pub use deferred::Deferred; pub use empty_mutation::EmptyMutation; pub use empty_subscription::EmptySubscription; pub use maybe_undefined::MaybeUndefined; pub use query_root::QueryRoot; pub use r#enum::{EnumItem, EnumType}; -pub use streamed::Streamed; pub use upload::Upload; diff --git a/src/types/streamed.rs b/src/types/streamed.rs deleted file mode 100644 index 80eebd71..00000000 --- a/src/types/streamed.rs +++ /dev/null @@ -1,111 +0,0 @@ -use crate::context::DeferList; -use crate::registry::Registry; -use crate::{ContextSelectionSet, OutputValueType, Positioned, QueryResponse, Result, Type}; -use async_graphql_parser::query::Field; -use itertools::Itertools; -use parking_lot::Mutex; -use std::borrow::Cow; -use std::sync::atomic::AtomicUsize; -use std::sync::Arc; - -/// Streamed type -/// -/// Similar to Deferred, but you can defer every item of the list type, only takes effect when the @stream directive exists on the field. -pub struct Streamed(Mutex>>); - -impl From> for Streamed { - fn from(value: Vec) -> Self { - Self(Mutex::new(Some(value))) - } -} - -impl Type for Streamed { - fn type_name() -> Cow<'static, str> { - Vec::::type_name() - } - - fn create_type_info(registry: &mut Registry) -> String { - Vec::::create_type_info(registry) - } -} - -#[async_trait::async_trait] -impl OutputValueType for Streamed { - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> Result { - let list = self.0.lock().take(); - if let Some(list) = list { - if let Some(defer_list) = ctx.defer_list { - if ctx.is_stream(&field.directives) { - let mut field = field.clone(); - - // remove @stream directive - if let Some((idx, _)) = field - .node - .directives - .iter() - .find_position(|d| d.name.as_str() == "stream") - { - field.node.directives.remove(idx); - } - - let field = Arc::new(field); - - let path_prefix = ctx - .path_node - .as_ref() - .map(|path| match serde_json::to_value(path) { - Ok(serde_json::Value::Array(values)) => values, - _ => Default::default(), - }) - .unwrap_or_default(); - - for (idx, item) in list.into_iter().enumerate() { - let path_prefix = { - let mut path_prefix = path_prefix.clone(); - path_prefix.push(serde_json::Value::Number(idx.into())); - path_prefix - }; - let field = field.clone(); - let schema_env = ctx.schema_env.clone(); - let query_env = ctx.query_env.clone(); - - defer_list.append(async move { - let inc_resolve_id = AtomicUsize::default(); - let defer_list = DeferList { - path_prefix: path_prefix.clone(), - futures: Default::default(), - }; - let ctx = query_env.create_context( - &schema_env, - None, - &field.selection_set, - &inc_resolve_id, - Some(&defer_list), - ); - let data = item.resolve(&ctx, &field).await?; - - Ok(( - QueryResponse { - label: None, - path: Some(path_prefix), - data, - extensions: None, - cache_control: Default::default(), - }, - defer_list, - )) - }); - } - return Ok(serde_json::Value::Array(Vec::new())); - } - } - OutputValueType::resolve(&list, ctx, field).await - } else { - Ok(serde_json::Value::Null) - } - } -} diff --git a/tests/defer.rs b/tests/defer.rs deleted file mode 100644 index 1a5664ee..00000000 --- a/tests/defer.rs +++ /dev/null @@ -1,152 +0,0 @@ -use async_graphql::*; -use futures::StreamExt; - -#[async_std::test] -pub async fn test_defer() { - struct MyObj; - - #[Object] - impl MyObj { - async fn value(&self) -> i32 { - 20 - } - - async fn obj(&self) -> Deferred { - MyObj.into() - } - } - - struct Query; - - #[Object] - impl Query { - async fn value(&self) -> Deferred { - 10.into() - } - - async fn obj(&self) -> Deferred { - MyObj.into() - } - } - - let schema = Schema::new(Query, EmptyMutation, EmptySubscription); - let query = r#"{ - value @defer - }"#; - assert_eq!( - schema.execute(&query).await.unwrap().data, - serde_json::json!({ - "value": 10, - }) - ); - - let query = r#"{ - value @defer - obj @defer { - value - obj @defer { - value - } - } - }"#; - assert_eq!( - schema.execute(&query).await.unwrap().data, - serde_json::json!({ - "value": 10, - "obj": { - "value": 20, - "obj": { - "value": 20 - } - } - }) - ); - - let mut stream = schema.execute_stream(&query).await.into_stream(); - assert_eq!( - stream.next().await.unwrap().unwrap().data, - serde_json::json!({ - "value": null, - "obj": null, - }) - ); - - let next_resp = stream.next().await.unwrap().unwrap(); - assert_eq!(next_resp.path, Some(vec![serde_json::json!("value")])); - assert_eq!(next_resp.data, serde_json::json!(10)); - - let next_resp = stream.next().await.unwrap().unwrap(); - assert_eq!(next_resp.path, Some(vec![serde_json::json!("obj")])); - assert_eq!( - next_resp.data, - serde_json::json!({"value": 20, "obj": null}) - ); - - let next_resp = stream.next().await.unwrap().unwrap(); - assert_eq!( - next_resp.path, - Some(vec![serde_json::json!("obj"), serde_json::json!("obj")]) - ); - assert_eq!(next_resp.data, serde_json::json!({"value": 20})); - - assert!(stream.next().await.is_none()); -} - -#[async_std::test] -pub async fn test_stream() { - #[SimpleObject] - struct MyObj { - value: i32, - } - - struct Query; - - #[Object] - impl Query { - async fn objs(&self) -> Streamed { - Streamed::from(vec![ - MyObj { value: 1 }, - MyObj { value: 2 }, - MyObj { value: 3 }, - MyObj { value: 4 }, - MyObj { value: 5 }, - ]) - } - } - - let schema = Schema::new(Query, EmptyMutation, EmptySubscription); - let query = r#"{ - objs @stream { value } - }"#; - assert_eq!( - schema.execute(&query).await.unwrap().data, - serde_json::json!({ - "objs": [ - { "value": 1 }, - { "value": 2 }, - { "value": 3 }, - { "value": 4 }, - { "value": 5 }, - ] - }) - ); - - let mut stream = schema.execute_stream(&query).await.into_stream(); - assert_eq!( - stream.next().await.unwrap().unwrap().data, - serde_json::json!({ - "objs": [], - }) - ); - - for i in 0..5 { - let next_resp = stream.next().await.unwrap().unwrap(); - assert_eq!( - next_resp.path, - Some(vec![serde_json::json!("objs"), i.into()]) - ); - assert_eq!(next_resp.data, serde_json::json!({ "value": i + 1 })); - } - - assert!(stream.next().await.is_none()); -}