Add StreamResponse type

This commit is contained in:
Sunli 2020-05-21 11:36:44 +08:00
parent 4868cf242b
commit 01489e20a5
8 changed files with 116 additions and 69 deletions

View File

@ -8,14 +8,15 @@ use actix_web::body::BodyStream;
use actix_web::dev::{Payload, PayloadStream};
use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::StreamBody;
use async_graphql::http::{multipart_stream, StreamBody};
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder, QueryResponse,
StreamResponse,
};
use bytes::{buf::BufExt, Buf, Bytes};
use futures::channel::mpsc;
use futures::future::Ready;
use futures::{Future, SinkExt, Stream, StreamExt, TryFutureExt};
use futures::{Future, SinkExt, StreamExt, TryFutureExt};
use std::convert::Infallible;
use std::pin::Pin;
pub use subscription::WSSubscription;
@ -94,43 +95,30 @@ impl Responder for GQLResponse {
}
/// Responder for GraphQL response stream
pub struct GQLResponseStream<S: Stream<Item = async_graphql::Result<QueryResponse>>>(S);
pub struct GQLResponseStream(StreamResponse);
impl<S: Stream<Item = async_graphql::Result<QueryResponse>> + 'static> From<S>
for GQLResponseStream<S>
{
fn from(stream: S) -> Self {
GQLResponseStream(stream)
impl From<StreamResponse> for GQLResponseStream {
fn from(resp: StreamResponse) -> Self {
GQLResponseStream(resp)
}
}
impl<S: Stream<Item = async_graphql::Result<QueryResponse>> + Unpin + 'static> Responder
for GQLResponseStream<S>
{
impl Responder for GQLResponseStream {
type Error = Error;
type Future = Ready<Result<HttpResponse, Error>>;
fn respond_to(self, _req: &HttpRequest) -> Self::Future {
let body = BodyStream::new(
self.0
.map(|res| serde_json::to_vec(&async_graphql::http::GQLResponse(res)).unwrap())
.map(|data| {
Ok::<_, std::convert::Infallible>(
Bytes::from(format!(
"\r\n---\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n",
data.len()
))
.chain(Bytes::from(data))
.to_bytes(),
)
})
.chain(futures::stream::once(futures::future::ok(
Bytes::from_static(b"\r\n-----\r\n"),
))),
);
let res = HttpResponse::build(StatusCode::OK)
.content_type("multipart/mixed; boundary=\"-\"")
.body(body);
futures::future::ok(res)
fn respond_to(self, req: &HttpRequest) -> Self::Future {
match self.0 {
StreamResponse::Single(resp) => GQLResponse(resp).respond_to(req),
StreamResponse::Stream(stream) => {
let body = BodyStream::new(
multipart_stream(stream).map(|item| Result::<_, Infallible>::Ok(item)),
);
let res = HttpResponse::build(StatusCode::OK)
.content_type("multipart/mixed; boundary=\"-\"")
.body(body);
futures::future::ok(res)
}
}
}
}

View File

@ -24,6 +24,7 @@ Comparing Features of Other Rust GraphQL Implementations
| Field guard | 👍 | ⛔️ |
| Multipart request(upload file) | 👍 | ⛔️ |
| Subscription | 👍 | ⛔️ |
| @defer/@stream | 👍 | ⛔️ |
| Opentracing | 👍 | ⛔️ |
| Apollo Federation | 👍 | ⛔️ |
| Apollo Tracing | 👍 | ⛔️ |

View File

@ -2,12 +2,14 @@
mod graphiql_source;
mod into_query_builder;
mod multipart_stream;
mod playground_source;
mod stream_body;
use itertools::Itertools;
pub use graphiql_source::graphiql_source;
pub use multipart_stream::multipart_stream;
pub use playground_source::playground_source;
pub use stream_body::StreamBody;

View File

@ -0,0 +1,22 @@
use crate::http::GQLResponse;
use crate::{QueryResponse, Result};
use bytes::{buf::BufExt, Buf, Bytes};
use futures::{Stream, StreamExt};
/// Create a multipart response data stream.
pub fn multipart_stream(
s: impl Stream<Item = Result<QueryResponse>> + Unpin + 'static,
) -> impl Stream<Item = Bytes> {
s.map(|res| serde_json::to_vec(&GQLResponse(res)).unwrap())
.map(|data| {
Bytes::from(format!(
"\r\n---\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n",
data.len()
))
.chain(Bytes::from(data))
.to_bytes()
})
.chain(futures::stream::once(async move {
Bytes::from_static(b"\r\n-----\r\n")
}))
}

View File

@ -142,7 +142,9 @@ pub use error::{
};
pub use look_ahead::Lookahead;
pub use parser::{Pos, Positioned, Value};
pub use query::{IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse};
pub use query::{
IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse, StreamResponse,
};
pub use registry::CacheControl;
pub use scalars::{Any, Json, ID};
pub use schema::{Schema, SchemaBuilder, SchemaEnv};

View File

@ -13,6 +13,7 @@ use futures::{Stream, StreamExt};
use itertools::Itertools;
use std::any::Any;
use std::fs::File;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
@ -97,6 +98,25 @@ impl QueryResponse {
}
}
/// Response for `Schema::execute_stream` and `QueryBuilder::execute_stream`
pub enum StreamResponse {
/// There is no `@defer` or `@stream` directive in the query, this is the final result.
Single(Result<QueryResponse>),
/// Streaming responses.
Stream(Pin<Box<dyn Stream<Item = Result<QueryResponse>> + Send + 'static>>),
}
impl StreamResponse {
/// Convert to a stream.
pub fn into_stream(self) -> impl Stream<Item = Result<QueryResponse>> + Send + 'static {
match self {
StreamResponse::Single(resp) => Box::pin(futures::stream::once(async move { resp })),
StreamResponse::Stream(stream) => stream,
}
}
}
/// Query builder
pub struct QueryBuilder {
pub(crate) query_source: String,
@ -158,43 +178,50 @@ impl QueryBuilder {
/// Execute the query, returns a stream, the first result being the query result,
/// followed by the incremental result. Only when there are `@defer` and `@stream` directives
/// in the query will there be subsequent incremental results.
pub fn execute_stream<Query, Mutation, Subscription>(
pub async fn execute_stream<Query, Mutation, Subscription>(
self,
schema: &Schema<Query, Mutation, Subscription>,
) -> impl Stream<Item = Result<QueryResponse>>
) -> StreamResponse
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let schema = schema.clone();
let stream = async_stream::try_stream! {
let (first_resp, defer_list) = self.execute_first(&schema).await?;
yield first_resp;
let mut current_defer_list = Vec::new();
for fut in defer_list.futures.into_inner() {
current_defer_list.push((defer_list.path_prefix.clone(), fut));
match self.execute_first(&schema).await {
Ok((first_resp, defer_list)) if defer_list.futures.lock().is_empty() => {
return StreamResponse::Single(Ok(first_resp));
}
Err(err) => StreamResponse::Single(Err(err)),
Ok((first_resp, defer_list)) => {
let stream = async_stream::try_stream! {
yield first_resp;
loop {
let mut next_defer_list = Vec::new();
for (path_prefix, defer) in current_defer_list {
let (res, mut defer_list) = defer.await?;
let mut current_defer_list = Vec::new();
for fut in defer_list.futures.into_inner() {
let mut next_path_prefix = path_prefix.clone();
next_path_prefix.extend(defer_list.path_prefix.clone());
next_defer_list.push((next_path_prefix, fut));
current_defer_list.push((defer_list.path_prefix.clone(), fut));
}
yield res.apply_path_prefix(path_prefix);
}
if next_defer_list.is_empty() {
break;
}
current_defer_list = next_defer_list;
loop {
let mut next_defer_list = Vec::new();
for (path_prefix, defer) in current_defer_list {
let (res, mut defer_list) = defer.await?;
for fut in defer_list.futures.into_inner() {
let mut next_path_prefix = path_prefix.clone();
next_path_prefix.extend(defer_list.path_prefix.clone());
next_defer_list.push((next_path_prefix, fut));
}
yield res.apply_path_prefix(path_prefix);
}
if next_defer_list.is_empty() {
break;
}
current_defer_list = next_defer_list;
}
};
StreamResponse::Stream(Box::pin(stream))
}
};
Box::pin(stream)
}
}
async fn execute_first<'a, Query, Mutation, Subscription>(
@ -332,11 +359,16 @@ impl QueryBuilder {
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let mut stream = self.execute_stream(schema);
let mut resp = stream.next().await.unwrap()?;
while let Some(resp_part) = stream.next().await.transpose()? {
resp.merge(resp_part);
let resp = self.execute_stream(schema).await;
match resp {
StreamResponse::Single(res) => res,
StreamResponse::Stream(mut stream) => {
let mut resp = stream.next().await.unwrap()?;
while let Some(resp_part) = stream.next().await.transpose()? {
resp.merge(resp_part);
}
Ok(resp)
}
}
Ok(resp)
}
}

View File

@ -2,7 +2,7 @@ use crate::context::Data;
use crate::extensions::{BoxExtension, Extension};
use crate::model::__DirectiveLocation;
use crate::parser::parse_query;
use crate::query::QueryBuilder;
use crate::query::{QueryBuilder, StreamResponse};
use crate::registry::{MetaDirective, MetaInputValue, Registry};
use crate::subscription::{create_connection, create_subscription_stream, SubscriptionTransport};
use crate::types::QueryRoot;
@ -293,8 +293,8 @@ where
/// Execute the query without create the `QueryBuilder`, returns a stream, the first result being the query result,
/// followed by the incremental result. Only when there are `@defer` and `@stream` directives
/// in the query will there be subsequent incremental results.
pub fn execute_stream(&self, query_source: &str) -> impl Stream<Item = Result<QueryResponse>> {
QueryBuilder::new(query_source).execute_stream(self)
pub async fn execute_stream(&self, query_source: &str) -> StreamResponse {
QueryBuilder::new(query_source).execute_stream(self).await
}
/// Create subscription stream, typically called inside the `SubscriptionTransport::handle_request` method

View File

@ -63,7 +63,7 @@ pub async fn test_defer() {
})
);
let mut stream = schema.execute_stream(&query);
let mut stream = schema.execute_stream(&query).await.into_stream();
assert_eq!(
stream.next().await.unwrap().unwrap().data,
serde_json::json!({
@ -133,7 +133,7 @@ pub async fn test_stream() {
})
);
let mut stream = schema.execute_stream(&query);
let mut stream = schema.execute_stream(&query).await.into_stream();
assert_eq!(
stream.next().await.unwrap().unwrap().data,
serde_json::json!({