From 906bcce9323fa3d41a37cc188106fa00af453c6d Mon Sep 17 00:00:00 2001 From: Sunli Date: Thu, 21 May 2020 15:38:26 +0800 Subject: [PATCH] Remove Clone bound for Deferred and Streamed --- src/types/deferred.rs | 111 ++++++++++++++++++++------------------ src/types/streamed.rs | 123 ++++++++++++++++++++++-------------------- tests/defer.rs | 2 - 3 files changed, 122 insertions(+), 114 deletions(-) diff --git a/src/types/deferred.rs b/src/types/deferred.rs index 626058e8..4e18d28b 100644 --- a/src/types/deferred.rs +++ b/src/types/deferred.rs @@ -3,21 +3,22 @@ use crate::registry::Registry; use crate::{ContextSelectionSet, OutputValueType, Positioned, QueryResponse, Result, Type}; use async_graphql_parser::query::Field; use itertools::Itertools; +use parking_lot::Mutex; use std::borrow::Cow; use std::sync::atomic::AtomicUsize; /// Deferred type /// /// Allows to defer the type of results returned, only takes effect when the @defer directive exists on the field. -pub struct Deferred(T); +pub struct Deferred(Mutex>); -impl From for Deferred { +impl From for Deferred { fn from(value: T) -> Self { - Self(value) + Self(Mutex::new(Some(value))) } } -impl Type for Deferred { +impl Type for Deferred { fn type_name() -> Cow<'static, str> { T::type_name() } @@ -28,63 +29,67 @@ impl Type for Deferred { } #[async_trait::async_trait] -impl OutputValueType for Deferred { +impl OutputValueType for Deferred { async fn resolve( &self, ctx: &ContextSelectionSet<'_>, field: &Positioned, ) -> Result { - if let Some(defer_list) = ctx.defer_list { - if ctx.is_defer(&field.directives) { - let obj = self.0.clone(); - let schema_env = ctx.schema_env.clone(); - let query_env = ctx.query_env.clone(); - let mut field = field.clone(); + let obj = self.0.lock().take(); + if let Some(obj) = obj { + if let Some(defer_list) = ctx.defer_list { + if ctx.is_defer(&field.directives) { + let schema_env = ctx.schema_env.clone(); + let query_env = ctx.query_env.clone(); + let mut field = field.clone(); - // remove @defer directive - if let Some((idx, _)) = field - .node - .directives - .iter() - .find_position(|d| d.name.as_str() == "defer") - { - field.node.directives.remove(idx); + // remove @defer directive + if let Some((idx, _)) = field + .node + .directives + .iter() + .find_position(|d| d.name.as_str() == "defer") + { + field.node.directives.remove(idx); + } + + let path_prefix = ctx + .path_node + .as_ref() + .map(|path| path.to_json()) + .unwrap_or_default(); + + defer_list.append(async move { + let inc_resolve_id = AtomicUsize::default(); + let defer_list = DeferList { + path_prefix: path_prefix.clone(), + futures: Default::default(), + }; + let ctx = query_env.create_context( + &schema_env, + None, + &field.selection_set, + &inc_resolve_id, + Some(&defer_list), + ); + let data = obj.resolve(&ctx, &field).await?; + + Ok(( + QueryResponse { + path: Some(path_prefix), + data, + extensions: None, + cache_control: Default::default(), + }, + defer_list, + )) + }); + return Ok(serde_json::Value::Null); } - - let path_prefix = ctx - .path_node - .as_ref() - .map(|path| path.to_json()) - .unwrap_or_default(); - - defer_list.append(async move { - let inc_resolve_id = AtomicUsize::default(); - let defer_list = DeferList { - path_prefix: path_prefix.clone(), - futures: Default::default(), - }; - let ctx = query_env.create_context( - &schema_env, - None, - &field.selection_set, - &inc_resolve_id, - Some(&defer_list), - ); - let data = obj.resolve(&ctx, &field).await?; - - Ok(( - QueryResponse { - path: Some(path_prefix), - data, - extensions: None, - cache_control: Default::default(), - }, - defer_list, - )) - }); - return Ok(serde_json::Value::Null); } + OutputValueType::resolve(&obj, ctx, field).await + } else { + Ok(serde_json::Value::Null) } - OutputValueType::resolve(&self.0, ctx, field).await } } diff --git a/src/types/streamed.rs b/src/types/streamed.rs index 49f303f6..9e2d84ec 100644 --- a/src/types/streamed.rs +++ b/src/types/streamed.rs @@ -3,6 +3,7 @@ use crate::registry::Registry; use crate::{ContextSelectionSet, OutputValueType, Positioned, QueryResponse, Result, Type}; use async_graphql_parser::query::Field; use itertools::Itertools; +use parking_lot::Mutex; use std::borrow::Cow; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -10,15 +11,15 @@ use std::sync::Arc; /// Streamed type /// /// Similar to Deferred, but you can defer every item of the list type, only takes effect when the @stream directive exists on the field. -pub struct Streamed(Vec); +pub struct Streamed(Mutex>>); -impl From> for Streamed { +impl From> for Streamed { fn from(value: Vec) -> Self { - Self(value) + Self(Mutex::new(Some(value))) } } -impl Type for Streamed { +impl Type for Streamed { fn type_name() -> Cow<'static, str> { Vec::::type_name() } @@ -29,74 +30,78 @@ impl Type for Streamed { } #[async_trait::async_trait] -impl OutputValueType for Streamed { +impl OutputValueType for Streamed { async fn resolve( &self, ctx: &ContextSelectionSet<'_>, field: &Positioned, ) -> Result { - if let Some(defer_list) = ctx.defer_list { - if ctx.is_stream(&field.directives) { - let list = self.0.clone(); - let mut field = field.clone(); + let list = self.0.lock().take(); + if let Some(list) = list { + if let Some(defer_list) = ctx.defer_list { + if ctx.is_stream(&field.directives) { + let mut field = field.clone(); - // remove @stream directive - if let Some((idx, _)) = field - .node - .directives - .iter() - .find_position(|d| d.name.as_str() == "stream") - { - field.node.directives.remove(idx); - } + // remove @stream directive + if let Some((idx, _)) = field + .node + .directives + .iter() + .find_position(|d| d.name.as_str() == "stream") + { + field.node.directives.remove(idx); + } - let field = Arc::new(field); + let field = Arc::new(field); - let path_prefix = ctx - .path_node - .as_ref() - .map(|path| path.to_json()) - .unwrap_or_default(); + let path_prefix = ctx + .path_node + .as_ref() + .map(|path| path.to_json()) + .unwrap_or_default(); - for (idx, item) in list.into_iter().enumerate() { - let path_prefix = { - let mut path_prefix = path_prefix.clone(); - path_prefix.push(serde_json::Value::Number(idx.into())); - path_prefix - }; - let field = field.clone(); - let schema_env = ctx.schema_env.clone(); - let query_env = ctx.query_env.clone(); - - defer_list.append(async move { - let inc_resolve_id = AtomicUsize::default(); - let defer_list = DeferList { - path_prefix: path_prefix.clone(), - futures: Default::default(), + for (idx, item) in list.into_iter().enumerate() { + let path_prefix = { + let mut path_prefix = path_prefix.clone(); + path_prefix.push(serde_json::Value::Number(idx.into())); + path_prefix }; - let ctx = query_env.create_context( - &schema_env, - None, - &field.selection_set, - &inc_resolve_id, - Some(&defer_list), - ); - let data = item.resolve(&ctx, &field).await?; + let field = field.clone(); + let schema_env = ctx.schema_env.clone(); + let query_env = ctx.query_env.clone(); - Ok(( - QueryResponse { - path: Some(path_prefix), - data, - extensions: None, - cache_control: Default::default(), - }, - defer_list, - )) - }); + defer_list.append(async move { + let inc_resolve_id = AtomicUsize::default(); + let defer_list = DeferList { + path_prefix: path_prefix.clone(), + futures: Default::default(), + }; + let ctx = query_env.create_context( + &schema_env, + None, + &field.selection_set, + &inc_resolve_id, + Some(&defer_list), + ); + let data = item.resolve(&ctx, &field).await?; + + Ok(( + QueryResponse { + path: Some(path_prefix), + data, + extensions: None, + cache_control: Default::default(), + }, + defer_list, + )) + }); + } + return Ok(serde_json::Value::Array(Vec::new())); } - return Ok(serde_json::Value::Array(Vec::new())); } + OutputValueType::resolve(&list, ctx, field).await + } else { + Ok(serde_json::Value::Null) } - OutputValueType::resolve(&self.0, ctx, field).await } } diff --git a/tests/defer.rs b/tests/defer.rs index 0fb784c4..1a5664ee 100644 --- a/tests/defer.rs +++ b/tests/defer.rs @@ -3,7 +3,6 @@ use futures::StreamExt; #[async_std::test] pub async fn test_defer() { - #[derive(Clone)] struct MyObj; #[Object] @@ -96,7 +95,6 @@ pub async fn test_defer() { #[async_std::test] pub async fn test_stream() { #[SimpleObject] - #[derive(Clone)] struct MyObj { value: i32, }