Add @defer support for actix-web

This commit is contained in:
sunli 2020-05-20 20:10:40 +08:00
parent 87c6d31907
commit a745667922
4 changed files with 77 additions and 11 deletions

View File

@ -4,12 +4,18 @@
mod subscription;
use actix_web::body::BodyStream;
use actix_web::dev::{Payload, PayloadStream};
use actix_web::{http, web, Error, FromRequest, HttpRequest};
use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::StreamBody;
use async_graphql::{IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder};
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder, QueryResponse,
};
use bytes::{buf::BufExt, Buf, Bytes};
use futures::channel::mpsc;
use futures::{Future, SinkExt, StreamExt, TryFutureExt};
use futures::future::Ready;
use futures::{Future, SinkExt, Stream, StreamExt, TryFutureExt};
use std::pin::Pin;
pub use subscription::WSSubscription;
@ -65,3 +71,66 @@ impl FromRequest for GQLRequest {
})
}
}
/// Responder for GraphQL response
pub struct GQLResponse(async_graphql::Result<QueryResponse>);
impl From<async_graphql::Result<QueryResponse>> for GQLResponse {
fn from(res: async_graphql::Result<QueryResponse>) -> Self {
GQLResponse(res)
}
}
impl Responder for GQLResponse {
type Error = Error;
type Future = Ready<Result<HttpResponse, Error>>;
fn respond_to(self, _req: &HttpRequest) -> Self::Future {
let res = HttpResponse::build(StatusCode::OK)
.content_type("application/json")
.body(serde_json::to_string(&async_graphql::http::GQLResponse(self.0)).unwrap());
futures::future::ok(res)
}
}
/// Responder for GraphQL response stream
pub struct GQLResponseStream<S: Stream<Item = async_graphql::Result<QueryResponse>>>(S);
impl<S: Stream<Item = async_graphql::Result<QueryResponse>> + 'static> From<S>
for GQLResponseStream<S>
{
fn from(stream: S) -> Self {
GQLResponseStream(stream)
}
}
impl<S: Stream<Item = async_graphql::Result<QueryResponse>> + Unpin + 'static> Responder
for GQLResponseStream<S>
{
type Error = Error;
type Future = Ready<Result<HttpResponse, Error>>;
fn respond_to(self, _req: &HttpRequest) -> Self::Future {
let body = BodyStream::new(
self.0
.map(|res| serde_json::to_vec(&async_graphql::http::GQLResponse(res)).unwrap())
.map(|data| {
Ok::<_, std::convert::Infallible>(
Bytes::from(format!(
"\r\n---\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n",
data.len()
))
.chain(Bytes::from(data))
.to_bytes(),
)
})
.chain(futures::stream::once(futures::future::ok(
Bytes::from_static(b"\r\n-----\r\n"),
))),
);
let res = HttpResponse::build(StatusCode::OK)
.content_type("multipart/mixed; boundary=\"-\"")
.body(body);
futures::future::ok(res)
}
}

View File

@ -156,7 +156,7 @@ impl QueryBuilder {
/// Execute the query, returns a stream, the first result being the query result,
/// followed by the incremental result. Only when there are `@defer` and `@stream` directives
/// in the query will there be subsequent incremental results.
pub async fn execute_stream<Query, Mutation, Subscription>(
pub fn execute_stream<Query, Mutation, Subscription>(
self,
schema: &Schema<Query, Mutation, Subscription>,
) -> impl Stream<Item = Result<QueryResponse>>
@ -330,7 +330,7 @@ impl QueryBuilder {
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let mut stream = self.execute_stream(schema).await;
let mut stream = self.execute_stream(schema);
let mut resp = stream.next().await.unwrap()?;
while let Some(resp_part) = stream.next().await.transpose()? {
resp.merge(resp_part);

View File

@ -286,11 +286,8 @@ where
/// Execute the query without create the `QueryBuilder`, returns a stream, the first result being the query result,
/// followed by the incremental result. Only when there are `@defer` and `@stream` directives
/// in the query will there be subsequent incremental results.
pub async fn execute_stream(
&self,
query_source: &str,
) -> impl Stream<Item = Result<QueryResponse>> {
QueryBuilder::new(query_source).execute_stream(self).await
pub fn execute_stream(&self, query_source: &str) -> impl Stream<Item = Result<QueryResponse>> {
QueryBuilder::new(query_source).execute_stream(self)
}
/// Create subscription stream, typically called inside the `SubscriptionTransport::handle_request` method

View File

@ -63,7 +63,7 @@ pub async fn test_defer() {
})
);
let mut stream = schema.execute_stream(&query).await;
let mut stream = schema.execute_stream(&query);
assert_eq!(
stream.next().await.unwrap().unwrap().data,
serde_json::json!({