diff --git a/async-graphql-actix-web/src/lib.rs b/async-graphql-actix-web/src/lib.rs index 0ae2c2b1..042c6fcb 100644 --- a/async-graphql-actix-web/src/lib.rs +++ b/async-graphql-actix-web/src/lib.rs @@ -4,12 +4,18 @@ mod subscription; +use actix_web::body::BodyStream; use actix_web::dev::{Payload, PayloadStream}; -use actix_web::{http, web, Error, FromRequest, HttpRequest}; +use actix_web::http::StatusCode; +use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder}; use async_graphql::http::StreamBody; -use async_graphql::{IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder}; +use async_graphql::{ + IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder, QueryResponse, +}; +use bytes::{buf::BufExt, Buf, Bytes}; use futures::channel::mpsc; -use futures::{Future, SinkExt, StreamExt, TryFutureExt}; +use futures::future::Ready; +use futures::{Future, SinkExt, Stream, StreamExt, TryFutureExt}; use std::pin::Pin; pub use subscription::WSSubscription; @@ -65,3 +71,66 @@ impl FromRequest for GQLRequest { }) } } + +/// Responder for GraphQL response +pub struct GQLResponse(async_graphql::Result); + +impl From> for GQLResponse { + fn from(res: async_graphql::Result) -> Self { + GQLResponse(res) + } +} + +impl Responder for GQLResponse { + type Error = Error; + type Future = Ready>; + + fn respond_to(self, _req: &HttpRequest) -> Self::Future { + let res = HttpResponse::build(StatusCode::OK) + .content_type("application/json") + .body(serde_json::to_string(&async_graphql::http::GQLResponse(self.0)).unwrap()); + futures::future::ok(res) + } +} + +/// Responder for GraphQL response stream +pub struct GQLResponseStream>>(S); + +impl> + 'static> From + for GQLResponseStream +{ + fn from(stream: S) -> Self { + GQLResponseStream(stream) + } +} + +impl> + Unpin + 'static> Responder + for GQLResponseStream +{ + type Error = Error; + type Future = Ready>; + + fn respond_to(self, _req: &HttpRequest) -> Self::Future { + let body = BodyStream::new( + self.0 + .map(|res| serde_json::to_vec(&async_graphql::http::GQLResponse(res)).unwrap()) + .map(|data| { + Ok::<_, std::convert::Infallible>( + Bytes::from(format!( + "\r\n---\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n", + data.len() + )) + .chain(Bytes::from(data)) + .to_bytes(), + ) + }) + .chain(futures::stream::once(futures::future::ok( + Bytes::from_static(b"\r\n-----\r\n"), + ))), + ); + let res = HttpResponse::build(StatusCode::OK) + .content_type("multipart/mixed; boundary=\"-\"") + .body(body); + futures::future::ok(res) + } +} diff --git a/src/query.rs b/src/query.rs index eac14673..26959e00 100644 --- a/src/query.rs +++ b/src/query.rs @@ -156,7 +156,7 @@ impl QueryBuilder { /// Execute the query, returns a stream, the first result being the query result, /// followed by the incremental result. Only when there are `@defer` and `@stream` directives /// in the query will there be subsequent incremental results. - pub async fn execute_stream( + pub fn execute_stream( self, schema: &Schema, ) -> impl Stream> @@ -330,7 +330,7 @@ impl QueryBuilder { Mutation: ObjectType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static, { - let mut stream = self.execute_stream(schema).await; + let mut stream = self.execute_stream(schema); let mut resp = stream.next().await.unwrap()?; while let Some(resp_part) = stream.next().await.transpose()? { resp.merge(resp_part); diff --git a/src/schema.rs b/src/schema.rs index f2d58c85..c3537ac6 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -286,11 +286,8 @@ where /// Execute the query without create the `QueryBuilder`, returns a stream, the first result being the query result, /// followed by the incremental result. Only when there are `@defer` and `@stream` directives /// in the query will there be subsequent incremental results. - pub async fn execute_stream( - &self, - query_source: &str, - ) -> impl Stream> { - QueryBuilder::new(query_source).execute_stream(self).await + pub fn execute_stream(&self, query_source: &str) -> impl Stream> { + QueryBuilder::new(query_source).execute_stream(self) } /// Create subscription stream, typically called inside the `SubscriptionTransport::handle_request` method diff --git a/tests/defer.rs b/tests/defer.rs index 6618076c..193befed 100644 --- a/tests/defer.rs +++ b/tests/defer.rs @@ -63,7 +63,7 @@ pub async fn test_defer() { }) ); - let mut stream = schema.execute_stream(&query).await; + let mut stream = schema.execute_stream(&query); assert_eq!( stream.next().await.unwrap().unwrap().data, serde_json::json!({