Removes code about streaming requests.

This commit is contained in:
Sunli 2020-07-31 10:10:03 +08:00
parent d06073fc77
commit 2cf350a5c8
15 changed files with 16 additions and 713 deletions

View File

@ -5,20 +5,17 @@
mod subscription;
use actix_web::body::BodyStream;
use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream};
use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::{multipart_stream, StreamBody};
use async_graphql::http::StreamBody;
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder, QueryResponse,
StreamResponse,
};
use futures::channel::mpsc;
use futures::future::Ready;
use futures::{Future, SinkExt, StreamExt, TryFutureExt};
use http::Method;
use std::convert::Infallible;
use std::pin::Pin;
pub use subscription::WSSubscription;
@ -112,33 +109,6 @@ impl Responder for GQLResponse {
}
}
/// Responder for GraphQL response stream
pub struct GQLResponseStream(StreamResponse);
impl From<StreamResponse> for GQLResponseStream {
fn from(resp: StreamResponse) -> Self {
GQLResponseStream(resp)
}
}
impl Responder for GQLResponseStream {
type Error = Error;
type Future = Ready<Result<HttpResponse, Error>>;
fn respond_to(self, req: &HttpRequest) -> Self::Future {
match self.0 {
StreamResponse::Single(resp) => GQLResponse(resp).respond_to(req),
StreamResponse::Stream(stream) => {
let body =
BodyStream::new(multipart_stream(stream).map(Result::<_, Infallible>::Ok));
let mut res = HttpResponse::build(StatusCode::OK);
res.content_type("multipart/mixed; boundary=\"-\"");
futures::future::ok(res.body(body))
}
}
}
}
fn add_cache_control(
builder: &mut HttpResponseBuilder,
resp: &async_graphql::Result<QueryResponse>,

View File

@ -274,7 +274,6 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
}),
&field.selection_set,
&resolve_id,
None,
);
#crate_name::OutputValueType::resolve(&msg, &ctx_selection_set, &*field).await
}

View File

@ -5,15 +5,12 @@
#![allow(clippy::needless_doctest_main)]
#![forbid(unsafe_code)]
use async_graphql::http::{multipart_stream, GQLRequest, GQLResponse, StreamBody};
use async_graphql::http::{GQLRequest, GQLResponse};
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, QueryResponse, Schema,
StreamResponse, SubscriptionType,
SubscriptionType,
};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::io::BufReader;
use futures::{SinkExt, StreamExt};
use std::str::FromStr;
use tide::{
http::{headers, Method},
@ -127,9 +124,6 @@ impl<State: Clone + Send + Sync + 'static> RequestExt<State> for Request<State>
pub trait ResponseExt: Sized {
/// Set body as the result of a GraphQL query.
fn body_graphql(self, res: async_graphql::Result<QueryResponse>) -> tide::Result<Self>;
/// Set body as the result of a GraphQL streaming query.
fn body_graphql_stream(self, res: StreamResponse) -> tide::Result<Self>;
}
impl ResponseExt for Response {
@ -138,31 +132,6 @@ impl ResponseExt for Response {
resp.set_body(Body::from_json(&GQLResponse(res))?);
Ok(resp)
}
fn body_graphql_stream(mut self, res: StreamResponse) -> tide::Result<Self> {
match res {
StreamResponse::Single(res) => self.body_graphql(res),
StreamResponse::Stream(stream) => {
// Body::from_reader required Sync, however StreamResponse does not have Sync.
// I created an issue and got a reply that this might be fixed in the future.
// https://github.com/http-rs/http-types/pull/144
// Now I can only use forwarding to solve the problem.
let mut stream =
Box::pin(multipart_stream(stream).map(Result::Ok::<_, std::io::Error>));
let (mut tx, rx) = mpsc::channel(0);
async_std::task::spawn(async move {
while let Some(item) = stream.next().await {
if tx.send(item).await.is_err() {
return;
}
}
});
self.set_body(Body::from_reader(BufReader::new(StreamBody::new(rx)), None));
self.insert_header(tide::http::headers::CONTENT_TYPE, "multipart/mixed");
Ok(self)
}
}
}
}
fn add_cache_control(

View File

@ -5,17 +5,15 @@
#![allow(clippy::needless_doctest_main)]
#![forbid(unsafe_code)]
use async_graphql::http::{multipart_stream, GQLRequest, StreamBody};
use async_graphql::http::{GQLRequest, StreamBody};
use async_graphql::{
Data, FieldResult, IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder,
QueryResponse, Schema, StreamResponse, SubscriptionType, WebSocketTransport,
QueryResponse, Schema, SubscriptionType, WebSocketTransport,
};
use bytes::Bytes;
use futures::select;
use futures::{SinkExt, StreamExt};
use hyper::header::HeaderValue;
use hyper::{Body, Method};
use std::convert::Infallible;
use hyper::Method;
use std::sync::Arc;
use warp::filters::ws::Message;
use warp::filters::BoxedFilter;
@ -312,30 +310,3 @@ impl Reply for GQLResponse {
resp
}
}
/// GraphQL streaming reply
pub struct GQLResponseStream(StreamResponse);
impl From<StreamResponse> for GQLResponseStream {
fn from(resp: StreamResponse) -> Self {
GQLResponseStream(resp)
}
}
impl Reply for GQLResponseStream {
fn into_response(self) -> Response {
match self.0 {
StreamResponse::Single(resp) => GQLResponse(resp).into_response(),
StreamResponse::Stream(stream) => {
let mut resp = Response::new(Body::wrap_stream(
multipart_stream(stream).map(Result::<_, Infallible>::Ok),
));
resp.headers_mut().insert(
"content-type",
HeaderValue::from_static("multipart/mixed; boundary=\"-\""),
);
resp
}
}
}
}

View File

@ -24,7 +24,6 @@ Comparing Features of Other Rust GraphQL Implementations
| Field guard | 👍 | ⛔️ |
| Multipart request(upload file) | 👍 | ⛔️ |
| Subscription | 👍 | ⛔️ |
| @defer/@stream | 👍 | ⛔️ |
| Opentracing | 👍 | ⛔️ |
| Apollo Federation | 👍 | ⛔️ |
| Apollo Tracing | 👍 | ⛔️ |

View File

@ -2,14 +2,11 @@ use crate::extensions::Extensions;
use crate::parser::query::{Directive, Field, SelectionSet};
use crate::schema::SchemaEnv;
use crate::{
FieldResult, InputValueType, Lookahead, Pos, Positioned, QueryError, QueryResponse, Result,
Type, Value,
FieldResult, InputValueType, Lookahead, Pos, Positioned, QueryError, Result, Type, Value,
};
use async_graphql_parser::query::Document;
use async_graphql_parser::UploadValue;
use fnv::FnvHashMap;
use futures::Future;
use parking_lot::Mutex;
use serde::ser::SerializeSeq;
use serde::Serializer;
use std::any::{Any, TypeId};
@ -17,7 +14,6 @@ use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::fs::File;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
@ -256,25 +252,6 @@ impl std::fmt::Display for ResolveId {
}
}
#[doc(hidden)]
pub type BoxDeferFuture =
Pin<Box<dyn Future<Output = Result<(QueryResponse, DeferList)>> + Send + 'static>>;
#[doc(hidden)]
pub struct DeferList {
pub path_prefix: Vec<serde_json::Value>,
pub futures: Mutex<Vec<BoxDeferFuture>>,
}
impl DeferList {
pub(crate) fn append<F>(&self, fut: F)
where
F: Future<Output = Result<(QueryResponse, DeferList)>> + Send + 'static,
{
self.futures.lock().push(Box::pin(fut));
}
}
/// Query context
#[derive(Clone)]
pub struct ContextBase<'a, T> {
@ -286,7 +263,6 @@ pub struct ContextBase<'a, T> {
pub item: T,
pub(crate) schema_env: &'a SchemaEnv,
pub(crate) query_env: &'a QueryEnv,
pub(crate) defer_list: Option<&'a DeferList>,
}
impl<'a, T> Deref for ContextBase<'a, T> {
@ -340,7 +316,6 @@ impl QueryEnv {
path_node: Option<QueryPathNode<'a>>,
item: T,
inc_resolve_id: &'a AtomicUsize,
defer_list: Option<&'a DeferList>,
) -> ContextBase<'a, T> {
ContextBase {
path_node,
@ -349,7 +324,6 @@ impl QueryEnv {
item,
schema_env,
query_env: self,
defer_list,
}
}
}
@ -387,7 +361,6 @@ impl<'a, T> ContextBase<'a, T> {
inc_resolve_id: self.inc_resolve_id,
schema_env: self.schema_env,
query_env: self.query_env,
defer_list: self.defer_list,
}
}
@ -403,7 +376,6 @@ impl<'a, T> ContextBase<'a, T> {
inc_resolve_id: &self.inc_resolve_id,
schema_env: self.schema_env,
query_env: self.query_env,
defer_list: self.defer_list,
}
}
@ -561,7 +533,6 @@ impl<'a> ContextBase<'a, &'a Positioned<SelectionSet>> {
inc_resolve_id: self.inc_resolve_id,
schema_env: self.schema_env,
query_env: self.query_env,
defer_list: self.defer_list,
}
}
}

View File

@ -61,14 +61,6 @@ impl Serialize for GQLResponse {
match &self.0 {
Ok(res) => {
let mut map = serializer.serialize_map(None)?;
if let Some(label) = &res.label {
map.serialize_key("label")?;
map.serialize_value(label)?;
}
if let Some(path) = &res.path {
map.serialize_key("path")?;
map.serialize_value(path)?;
}
map.serialize_key("data")?;
map.serialize_value(&res.data)?;
if res.extensions.is_some() {
@ -219,8 +211,6 @@ mod tests {
#[test]
fn test_response_data() {
let resp = GQLResponse(Ok(QueryResponse {
label: None,
path: None,
data: json!({"ok": true}),
extensions: None,
cache_control: Default::default(),

View File

@ -150,9 +150,7 @@ pub use error::{
};
pub use look_ahead::Lookahead;
pub use parser::{Pos, Positioned, Value};
pub use query::{
IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse, StreamResponse,
};
pub use query::{IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse};
pub use registry::CacheControl;
pub use scalars::{Any, Json, OutputJson, ID};
pub use schema::{Schema, SchemaBuilder, SchemaEnv};
@ -160,9 +158,7 @@ pub use serde_json::Number;
pub use subscription::{
SimpleBroker, SubscriptionStreams, SubscriptionTransport, WebSocketTransport,
};
pub use types::{
connection, Deferred, EmptyMutation, EmptySubscription, MaybeUndefined, Streamed, Upload,
};
pub use types::{connection, EmptyMutation, EmptySubscription, MaybeUndefined, Upload};
pub use validation::ValidationMode;
/// Result type

View File

@ -1,4 +1,4 @@
use crate::context::{Data, DeferList, ResolveId};
use crate::context::{Data, ResolveId};
use crate::error::ParseRequestError;
use crate::extensions::{BoxExtension, ErrorLogger, Extension};
use crate::mutation_resolver::do_mutation_resolve;
@ -8,12 +8,8 @@ use crate::{
SubscriptionType, Variables,
};
use async_graphql_parser::query::OperationType;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use std::any::Any;
use std::borrow::Cow;
use std::fs::File;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
@ -43,14 +39,6 @@ pub trait IntoQueryBuilder: Sized {
/// Query response
#[derive(Debug)]
pub struct QueryResponse {
/// Label for RelayModernQueryExecutor
///
/// https://github.com/facebook/relay/blob/2859aa8df4df7d4d6d9eef4c9dc1134286773710/packages/relay-runtime/store/RelayModernQueryExecutor.js#L1267
pub label: Option<String>,
/// Path for subsequent response
pub path: Option<Vec<serde_json::Value>>,
/// Data of query result
pub data: serde_json::Value,
@ -61,80 +49,6 @@ pub struct QueryResponse {
pub cache_control: CacheControl,
}
impl QueryResponse {
pub(crate) fn apply_path_prefix(mut self, mut prefix: Vec<serde_json::Value>) -> Self {
if let Some(path) = &mut self.path {
prefix.extend(path.drain(..));
*path = prefix;
} else {
self.path = Some(prefix);
}
self.label = self.path.as_ref().map(|path| {
path.iter()
.map(|value| {
if let serde_json::Value::String(s) = value {
Cow::Borrowed(s.as_str())
} else {
Cow::Owned(value.to_string())
}
})
.join("$")
});
self
}
pub(crate) fn merge(&mut self, resp: QueryResponse) {
let mut p = &mut self.data;
for item in resp.path.unwrap_or_default() {
match item {
serde_json::Value::String(name) => {
if let serde_json::Value::Object(obj) = p {
if let Some(next) = obj.get_mut(&name) {
p = next;
continue;
}
}
return;
}
serde_json::Value::Number(idx) => {
if let serde_json::Value::Array(array) = p {
let idx = idx.as_i64().unwrap() as usize;
while array.len() <= idx {
array.push(serde_json::Value::Null);
}
p = array.get_mut(idx as usize).unwrap();
continue;
}
return;
}
_ => {}
}
}
*p = resp.data;
}
}
/// Response for `Schema::execute_stream` and `QueryBuilder::execute_stream`
pub enum StreamResponse {
/// There is no `@defer` or `@stream` directive in the query, this is the final result.
Single(Result<QueryResponse>),
/// Streaming responses.
Stream(Pin<Box<dyn Stream<Item = Result<QueryResponse>> + Send + 'static>>),
}
impl StreamResponse {
/// Convert to a stream.
pub fn into_stream(self) -> impl Stream<Item = Result<QueryResponse>> + Send + 'static {
match self {
StreamResponse::Single(resp) => Box::pin(futures::stream::once(async move { resp })),
StreamResponse::Stream(stream) => stream,
}
}
}
/// Query builder
pub struct QueryBuilder {
pub(crate) query_source: String,
@ -205,67 +119,11 @@ impl QueryBuilder {
.set_upload(var_path, filename, content_type, content);
}
/// Execute the query, returns a stream, the first result being the query result,
/// followed by the incremental result. Only when there are `@defer` and `@stream` directives
/// in the query will there be subsequent incremental results.
pub async fn execute_stream<Query, Mutation, Subscription>(
/// Execute the query, always return a complete result.
pub async fn execute<Query, Mutation, Subscription>(
self,
schema: &Schema<Query, Mutation, Subscription>,
) -> StreamResponse
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let schema = schema.clone();
match self.execute_first(&schema).await {
Ok((first_resp, defer_list)) if defer_list.futures.lock().is_empty() => {
StreamResponse::Single(Ok(first_resp))
}
Err(err) => StreamResponse::Single(Err(err)),
Ok((first_resp, defer_list)) => {
let stream = async_stream::try_stream! {
yield first_resp;
let mut current_defer_list = Vec::new();
for fut in defer_list.futures.into_inner() {
current_defer_list.push((defer_list.path_prefix.clone(), fut));
}
loop {
let mut next_defer_list = Vec::new();
for (path_prefix, defer) in current_defer_list {
let (res, mut defer_list) = defer.await?;
for fut in defer_list.futures.into_inner() {
let mut next_path_prefix = path_prefix.clone();
next_path_prefix.extend(defer_list.path_prefix.clone());
next_defer_list.push((next_path_prefix, fut));
}
let mut new_res = res.apply_path_prefix(path_prefix);
new_res.label = new_res.path.as_ref().map(|path| path.iter().map(|value| {
if let serde_json::Value::String(s) = value {
s.to_string()
} else {
value.to_string()
}
}).join("$"));
yield new_res;
}
if next_defer_list.is_empty() {
break;
}
current_defer_list = next_defer_list;
}
};
StreamResponse::Stream(Box::pin(stream))
}
}
}
async fn execute_first<'a, Query, Mutation, Subscription>(
self,
schema: &Schema<Query, Mutation, Subscription>,
) -> Result<(QueryResponse, DeferList)>
) -> Result<QueryResponse>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
@ -301,10 +159,6 @@ impl QueryBuilder {
document,
Arc::new(self.ctx_data.unwrap_or_default()),
);
let defer_list = DeferList {
path_prefix: Vec::new(),
futures: Default::default(),
};
let ctx = ContextBase {
path_node: None,
resolve_id: ResolveId::root(),
@ -312,7 +166,6 @@ impl QueryBuilder {
item: &env.document.current_operation().selection_set,
schema_env: &schema.env,
query_env: &env,
defer_list: Some(&defer_list),
};
env.extensions.lock().execution_start();
@ -329,37 +182,12 @@ impl QueryBuilder {
};
env.extensions.lock().execution_end();
let res = QueryResponse {
label: None,
path: None,
let resp = QueryResponse {
data,
extensions: env.extensions.lock().result(),
cache_control,
};
Ok((res, defer_list))
}
/// Execute the query, always return a complete result.
pub async fn execute<Query, Mutation, Subscription>(
self,
schema: &Schema<Query, Mutation, Subscription>,
) -> Result<QueryResponse>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let resp = self.execute_stream(schema).await;
match resp {
StreamResponse::Single(res) => res,
StreamResponse::Stream(mut stream) => {
let mut resp = stream.next().await.unwrap()?;
while let Some(resp_part) = stream.next().await.transpose()? {
resp.merge(resp_part);
}
Ok(resp)
}
}
Ok(resp)
}
/// Get query source

View File

@ -2,7 +2,7 @@ use crate::context::Data;
use crate::extensions::{BoxExtension, ErrorLogger, Extension, Extensions};
use crate::model::__DirectiveLocation;
use crate::parser::parse_query;
use crate::query::{QueryBuilder, StreamResponse};
use crate::query::QueryBuilder;
use crate::registry::{MetaDirective, MetaInputValue, Registry};
use crate::subscription::{create_connection, create_subscription_stream, SubscriptionTransport};
use crate::types::QueryRoot;
@ -241,20 +241,6 @@ where
}
});
registry.add_directive(MetaDirective {
name: "defer",
description: None,
locations: vec![__DirectiveLocation::FIELD],
args: Default::default(),
});
registry.add_directive(MetaDirective {
name: "stream",
description: None,
locations: vec![__DirectiveLocation::FIELD],
args: Default::default(),
});
// register scalars
bool::create_type_info(&mut registry);
i32::create_type_info(&mut registry);
@ -301,13 +287,6 @@ where
QueryBuilder::new(query_source).execute(self).await
}
/// Execute the query without create the `QueryBuilder`, returns a stream, the first result being the query result,
/// followed by the incremental result. Only when there are `@defer` and `@stream` directives
/// in the query will there be subsequent incremental results.
pub async fn execute_stream(&self, query_source: &str) -> StreamResponse {
QueryBuilder::new(query_source).execute_stream(self).await
}
pub(crate) fn prepare_query(
&self,
source: &str,
@ -400,7 +379,6 @@ where
None,
&env.document.current_operation().selection_set,
&resolve_id,
None,
);
let mut streams = Vec::new();
create_subscription_stream(self, env.clone(), &ctx, &mut streams)

View File

@ -141,8 +141,6 @@ impl SubscriptionTransport for WebSocketTransport {
id: Some(id.clone()),
payload: Some(
serde_json::to_value(GQLResponse(Ok(QueryResponse {
label: None,
path: None,
data: value,
extensions: None,
cache_control: Default::default(),

View File

@ -1,99 +0,0 @@
use crate::context::DeferList;
use crate::registry::Registry;
use crate::{ContextSelectionSet, OutputValueType, Positioned, QueryResponse, Result, Type};
use async_graphql_parser::query::Field;
use itertools::Itertools;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::sync::atomic::AtomicUsize;
/// Deferred type
///
/// Allows to defer the type of results returned, only takes effect when the @defer directive exists on the field.
pub struct Deferred<T: Type + Send + Sync + 'static>(Mutex<Option<T>>);
impl<T: Type + Send + Sync + 'static> From<T> for Deferred<T> {
fn from(value: T) -> Self {
Self(Mutex::new(Some(value)))
}
}
impl<T: Type + Send + Sync + 'static> Type for Deferred<T> {
fn type_name() -> Cow<'static, str> {
T::type_name()
}
fn create_type_info(registry: &mut Registry) -> String {
T::create_type_info(registry)
}
}
#[async_trait::async_trait]
impl<T: OutputValueType + Send + Sync + 'static> OutputValueType for Deferred<T> {
async fn resolve(
&self,
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> Result<serde_json::Value> {
let obj = self.0.lock().take();
if let Some(obj) = obj {
if let Some(defer_list) = ctx.defer_list {
if ctx.is_defer(&field.directives) {
let schema_env = ctx.schema_env.clone();
let query_env = ctx.query_env.clone();
let mut field = field.clone();
// remove @defer directive
if let Some((idx, _)) = field
.node
.directives
.iter()
.find_position(|d| d.name.as_str() == "defer")
{
field.node.directives.remove(idx);
}
let path_prefix = ctx
.path_node
.as_ref()
.map(|path| match serde_json::to_value(path) {
Ok(serde_json::Value::Array(values)) => values,
_ => Default::default(),
})
.unwrap_or_default();
defer_list.append(async move {
let inc_resolve_id = AtomicUsize::default();
let defer_list = DeferList {
path_prefix: path_prefix.clone(),
futures: Default::default(),
};
let ctx = query_env.create_context(
&schema_env,
None,
&field.selection_set,
&inc_resolve_id,
Some(&defer_list),
);
let data = obj.resolve(&ctx, &field).await?;
Ok((
QueryResponse {
label: None,
path: Some(path_prefix),
data,
extensions: None,
cache_control: Default::default(),
},
defer_list,
))
});
return Ok(serde_json::Value::Null);
}
}
OutputValueType::resolve(&obj, ctx, field).await
} else {
Ok(serde_json::Value::Null)
}
}
}

View File

@ -1,6 +1,5 @@
pub mod connection;
mod deferred;
mod empty_mutation;
mod empty_subscription;
mod r#enum;
@ -8,14 +7,11 @@ mod list;
mod maybe_undefined;
mod optional;
mod query_root;
mod streamed;
mod upload;
pub use deferred::Deferred;
pub use empty_mutation::EmptyMutation;
pub use empty_subscription::EmptySubscription;
pub use maybe_undefined::MaybeUndefined;
pub use query_root::QueryRoot;
pub use r#enum::{EnumItem, EnumType};
pub use streamed::Streamed;
pub use upload::Upload;

View File

@ -1,111 +0,0 @@
use crate::context::DeferList;
use crate::registry::Registry;
use crate::{ContextSelectionSet, OutputValueType, Positioned, QueryResponse, Result, Type};
use async_graphql_parser::query::Field;
use itertools::Itertools;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
/// Streamed type
///
/// Similar to Deferred, but you can defer every item of the list type, only takes effect when the @stream directive exists on the field.
pub struct Streamed<T: Type + Send + Sync + 'static>(Mutex<Option<Vec<T>>>);
impl<T: Type + Send + Sync + 'static> From<Vec<T>> for Streamed<T> {
fn from(value: Vec<T>) -> Self {
Self(Mutex::new(Some(value)))
}
}
impl<T: Type + Send + Sync + 'static> Type for Streamed<T> {
fn type_name() -> Cow<'static, str> {
Vec::<T>::type_name()
}
fn create_type_info(registry: &mut Registry) -> String {
Vec::<T>::create_type_info(registry)
}
}
#[async_trait::async_trait]
impl<T: OutputValueType + Send + Sync + 'static> OutputValueType for Streamed<T> {
async fn resolve(
&self,
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> Result<serde_json::Value> {
let list = self.0.lock().take();
if let Some(list) = list {
if let Some(defer_list) = ctx.defer_list {
if ctx.is_stream(&field.directives) {
let mut field = field.clone();
// remove @stream directive
if let Some((idx, _)) = field
.node
.directives
.iter()
.find_position(|d| d.name.as_str() == "stream")
{
field.node.directives.remove(idx);
}
let field = Arc::new(field);
let path_prefix = ctx
.path_node
.as_ref()
.map(|path| match serde_json::to_value(path) {
Ok(serde_json::Value::Array(values)) => values,
_ => Default::default(),
})
.unwrap_or_default();
for (idx, item) in list.into_iter().enumerate() {
let path_prefix = {
let mut path_prefix = path_prefix.clone();
path_prefix.push(serde_json::Value::Number(idx.into()));
path_prefix
};
let field = field.clone();
let schema_env = ctx.schema_env.clone();
let query_env = ctx.query_env.clone();
defer_list.append(async move {
let inc_resolve_id = AtomicUsize::default();
let defer_list = DeferList {
path_prefix: path_prefix.clone(),
futures: Default::default(),
};
let ctx = query_env.create_context(
&schema_env,
None,
&field.selection_set,
&inc_resolve_id,
Some(&defer_list),
);
let data = item.resolve(&ctx, &field).await?;
Ok((
QueryResponse {
label: None,
path: Some(path_prefix),
data,
extensions: None,
cache_control: Default::default(),
},
defer_list,
))
});
}
return Ok(serde_json::Value::Array(Vec::new()));
}
}
OutputValueType::resolve(&list, ctx, field).await
} else {
Ok(serde_json::Value::Null)
}
}
}

View File

@ -1,152 +0,0 @@
use async_graphql::*;
use futures::StreamExt;
#[async_std::test]
pub async fn test_defer() {
struct MyObj;
#[Object]
impl MyObj {
async fn value(&self) -> i32 {
20
}
async fn obj(&self) -> Deferred<MyObj> {
MyObj.into()
}
}
struct Query;
#[Object]
impl Query {
async fn value(&self) -> Deferred<i32> {
10.into()
}
async fn obj(&self) -> Deferred<MyObj> {
MyObj.into()
}
}
let schema = Schema::new(Query, EmptyMutation, EmptySubscription);
let query = r#"{
value @defer
}"#;
assert_eq!(
schema.execute(&query).await.unwrap().data,
serde_json::json!({
"value": 10,
})
);
let query = r#"{
value @defer
obj @defer {
value
obj @defer {
value
}
}
}"#;
assert_eq!(
schema.execute(&query).await.unwrap().data,
serde_json::json!({
"value": 10,
"obj": {
"value": 20,
"obj": {
"value": 20
}
}
})
);
let mut stream = schema.execute_stream(&query).await.into_stream();
assert_eq!(
stream.next().await.unwrap().unwrap().data,
serde_json::json!({
"value": null,
"obj": null,
})
);
let next_resp = stream.next().await.unwrap().unwrap();
assert_eq!(next_resp.path, Some(vec![serde_json::json!("value")]));
assert_eq!(next_resp.data, serde_json::json!(10));
let next_resp = stream.next().await.unwrap().unwrap();
assert_eq!(next_resp.path, Some(vec![serde_json::json!("obj")]));
assert_eq!(
next_resp.data,
serde_json::json!({"value": 20, "obj": null})
);
let next_resp = stream.next().await.unwrap().unwrap();
assert_eq!(
next_resp.path,
Some(vec![serde_json::json!("obj"), serde_json::json!("obj")])
);
assert_eq!(next_resp.data, serde_json::json!({"value": 20}));
assert!(stream.next().await.is_none());
}
#[async_std::test]
pub async fn test_stream() {
#[SimpleObject]
struct MyObj {
value: i32,
}
struct Query;
#[Object]
impl Query {
async fn objs(&self) -> Streamed<MyObj> {
Streamed::from(vec![
MyObj { value: 1 },
MyObj { value: 2 },
MyObj { value: 3 },
MyObj { value: 4 },
MyObj { value: 5 },
])
}
}
let schema = Schema::new(Query, EmptyMutation, EmptySubscription);
let query = r#"{
objs @stream { value }
}"#;
assert_eq!(
schema.execute(&query).await.unwrap().data,
serde_json::json!({
"objs": [
{ "value": 1 },
{ "value": 2 },
{ "value": 3 },
{ "value": 4 },
{ "value": 5 },
]
})
);
let mut stream = schema.execute_stream(&query).await.into_stream();
assert_eq!(
stream.next().await.unwrap().unwrap().data,
serde_json::json!({
"objs": [],
})
);
for i in 0..5 {
let next_resp = stream.next().await.unwrap().unwrap();
assert_eq!(
next_resp.path,
Some(vec![serde_json::json!("objs"), i.into()])
);
assert_eq!(next_resp.data, serde_json::json!({ "value": i + 1 }));
}
assert!(stream.next().await.is_none());
}