diff --git a/integrations/actix-web/src/batch_request.rs b/integrations/actix-web/src/batch_request.rs new file mode 100644 index 00000000..dbba8260 --- /dev/null +++ b/integrations/actix-web/src/batch_request.rs @@ -0,0 +1,100 @@ +use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream}; +use actix_web::http::StatusCode; +use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder}; +use async_graphql::http::MultipartOptions; +use async_graphql::ParseRequestError; +use futures::channel::mpsc; +use futures::future::Ready; +use futures::io::ErrorKind; +use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; +use std::io; +use std::pin::Pin; + +/// Extractor for GraphQL batch request +/// +/// It's a wrapper of `async_graphql::Request`, you can use `Request::into_inner` unwrap it to `async_graphql::Request`. +/// `async_graphql::http::MultipartOptions` allows to configure extraction process. +pub struct BatchRequest(async_graphql::BatchRequest); + +impl BatchRequest { + /// Unwraps the value to `async_graphql::Request`. + pub fn into_inner(self) -> async_graphql::BatchRequest { + self.0 + } +} + +impl FromRequest for BatchRequest { + type Error = Error; + type Future = Pin>>>; + type Config = MultipartOptions; + + fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { + let config = req.app_data::().cloned().unwrap_or_default(); + + let content_type = req + .headers() + .get(http::header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(|value| value.to_string()); + + let (mut tx, rx) = mpsc::channel(16); + + // Because Payload is !Send, so forward it to mpsc::Sender + let mut payload = web::Payload(payload.take()); + actix_rt::spawn(async move { + while let Some(item) = payload.next().await { + if tx.send(item).await.is_err() { + return; + } + } + }); + + Box::pin(async move { + Ok(BatchRequest( + async_graphql::http::receive_batch_body( + content_type, + rx.map_err(|err| io::Error::new(ErrorKind::Other, err)) + .into_async_read(), + config, + ) + .map_err(|err| match err { + ParseRequestError::PayloadTooLarge => { + actix_web::error::ErrorPayloadTooLarge(err) + } + _ => actix_web::error::ErrorBadRequest(err), + }) + .await?, + )) + }) + } +} + +/// Responder for GraphQL batch response +pub struct BatchResponse(async_graphql::BatchResponse); + +impl From for BatchResponse { + fn from(resp: async_graphql::BatchResponse) -> Self { + BatchResponse(resp) + } +} + +impl Responder for BatchResponse { + type Error = Error; + type Future = Ready>; + + fn respond_to(self, _req: &HttpRequest) -> Self::Future { + let mut res = HttpResponse::build(StatusCode::OK); + res.content_type("application/json"); + add_cache_control(&mut res, &self.0); + let res = res.body(serde_json::to_string(&self.0).unwrap()); + futures::future::ok(res) + } +} + +fn add_cache_control(builder: &mut HttpResponseBuilder, resp: &async_graphql::BatchResponse) { + if resp.is_ok() { + if let Some(cache_control) = resp.cache_control().value() { + builder.header("cache-control", cache_control); + } + } +} diff --git a/integrations/actix-web/src/lib.rs b/integrations/actix-web/src/lib.rs index 086901f6..4a918274 100644 --- a/integrations/actix-web/src/lib.rs +++ b/integrations/actix-web/src/lib.rs @@ -1,117 +1,11 @@ //! Async-graphql integration with Actix-web -#![warn(missing_docs)] #![forbid(unsafe_code)] +mod batch_request; +mod request; mod subscription; -use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream}; -use actix_web::http::StatusCode; -use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder}; -use async_graphql::http::MultipartOptions; -use async_graphql::ParseRequestError; -use futures::channel::mpsc; -use futures::future::Ready; -use futures::io::ErrorKind; -use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; -use http::Method; -use std::io; -use std::pin::Pin; +pub use batch_request::{BatchRequest, BatchResponse}; +pub use request::{Request, Response}; pub use subscription::WSSubscription; - -/// Extractor for GraphQL request -/// -/// It's a wrapper of `async_graphql::Request`, you can use `Request::into_inner` unwrap it to `async_graphql::Request`. -/// `async_graphql::http::MultipartOptions` allows to configure extraction process. -pub struct Request(async_graphql::Request); - -impl Request { - /// Unwraps the value to `async_graphql::Request`. - pub fn into_inner(self) -> async_graphql::Request { - self.0 - } -} - -impl FromRequest for Request { - type Error = Error; - type Future = Pin>>>; - type Config = MultipartOptions; - - fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - let config = req.app_data::().cloned().unwrap_or_default(); - - if req.method() == Method::GET { - let res = web::Query::::from_query(req.query_string()); - Box::pin(async move { - let gql_request = res?; - Ok(Request(gql_request.into_inner())) - }) - } else { - let content_type = req - .headers() - .get(http::header::CONTENT_TYPE) - .and_then(|value| value.to_str().ok()) - .map(|value| value.to_string()); - - let (mut tx, rx) = mpsc::channel(16); - - // Because Payload is !Send, so forward it to mpsc::Sender - let mut payload = web::Payload(payload.take()); - actix_rt::spawn(async move { - while let Some(item) = payload.next().await { - if tx.send(item).await.is_err() { - return; - } - } - }); - - Box::pin(async move { - Ok(Request( - async_graphql::http::receive_body( - content_type, - rx.map_err(|err| io::Error::new(ErrorKind::Other, err)) - .into_async_read(), - config, - ) - .map_err(|err| match err { - ParseRequestError::PayloadTooLarge => { - actix_web::error::ErrorPayloadTooLarge(err) - } - _ => actix_web::error::ErrorBadRequest(err), - }) - .await?, - )) - }) - } - } -} - -/// Responder for GraphQL response -pub struct Response(async_graphql::Response); - -impl From for Response { - fn from(resp: async_graphql::Response) -> Self { - Response(resp) - } -} - -impl Responder for Response { - type Error = Error; - type Future = Ready>; - - fn respond_to(self, _req: &HttpRequest) -> Self::Future { - let mut res = HttpResponse::build(StatusCode::OK); - res.content_type("application/json"); - add_cache_control(&mut res, &self.0); - let res = res.body(serde_json::to_string(&self.0).unwrap()); - futures::future::ok(res) - } -} - -fn add_cache_control(builder: &mut HttpResponseBuilder, resp: &async_graphql::Response) { - if resp.is_ok() { - if let Some(cache_control) = resp.cache_control.value() { - builder.header("cache-control", cache_control); - } - } -} diff --git a/integrations/actix-web/src/request.rs b/integrations/actix-web/src/request.rs new file mode 100644 index 00000000..e2b2ce2a --- /dev/null +++ b/integrations/actix-web/src/request.rs @@ -0,0 +1,109 @@ +use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream}; +use actix_web::http::StatusCode; +use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder}; +use async_graphql::http::MultipartOptions; +use async_graphql::ParseRequestError; +use futures::channel::mpsc; +use futures::future::Ready; +use futures::io::ErrorKind; +use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; +use http::Method; +use std::io; +use std::pin::Pin; + +/// Extractor for GraphQL request +/// +/// It's a wrapper of `async_graphql::Request`, you can use `Request::into_inner` unwrap it to `async_graphql::Request`. +/// `async_graphql::http::MultipartOptions` allows to configure extraction process. +pub struct Request(async_graphql::Request); + +impl Request { + /// Unwraps the value to `async_graphql::Request`. + pub fn into_inner(self) -> async_graphql::Request { + self.0 + } +} + +impl FromRequest for Request { + type Error = Error; + type Future = Pin>>>; + type Config = MultipartOptions; + + fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { + let config = req.app_data::().cloned().unwrap_or_default(); + + if req.method() == Method::GET { + let res = web::Query::::from_query(req.query_string()); + Box::pin(async move { + let gql_request = res?; + Ok(Request(gql_request.into_inner())) + }) + } else { + let content_type = req + .headers() + .get(http::header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(|value| value.to_string()); + + let (mut tx, rx) = mpsc::channel(16); + + // Because Payload is !Send, so forward it to mpsc::Sender + let mut payload = web::Payload(payload.take()); + actix_rt::spawn(async move { + while let Some(item) = payload.next().await { + if tx.send(item).await.is_err() { + return; + } + } + }); + + Box::pin(async move { + Ok(Request( + async_graphql::http::receive_body( + content_type, + rx.map_err(|err| io::Error::new(ErrorKind::Other, err)) + .into_async_read(), + config, + ) + .map_err(|err| match err { + ParseRequestError::PayloadTooLarge => { + actix_web::error::ErrorPayloadTooLarge(err) + } + _ => actix_web::error::ErrorBadRequest(err), + }) + .await?, + )) + }) + } + } +} + +/// Responder for GraphQL response +pub struct Response(async_graphql::Response); + +impl From for Response { + fn from(resp: async_graphql::Response) -> Self { + Response(resp) + } +} + +impl Responder for Response { + type Error = Error; + type Future = Ready>; + + fn respond_to(self, _req: &HttpRequest) -> Self::Future { + let mut res = HttpResponse::build(StatusCode::OK); + res.content_type("application/json"); + add_cache_control(&mut res, &self.0); + let res = res.body(serde_json::to_string(&self.0).unwrap()); + futures::future::ok(res) + } +} + +fn add_cache_control(builder: &mut HttpResponseBuilder, resp: &async_graphql::Response) { + if resp.is_ok() { + if let Some(cache_control) = resp.cache_control.value() { + builder.header("cache-control", cache_control); + } + } +} diff --git a/integrations/warp/src/batch_request.rs b/integrations/warp/src/batch_request.rs new file mode 100644 index 00000000..49ae56b2 --- /dev/null +++ b/integrations/warp/src/batch_request.rs @@ -0,0 +1,101 @@ +use crate::BadRequest; +use async_graphql::http::MultipartOptions; +use async_graphql::{ObjectType, Schema, SubscriptionType}; +use futures::TryStreamExt; +use std::io; +use std::io::ErrorKind; +use std::sync::Arc; +use warp::reply::Response as WarpResponse; +use warp::{Buf, Filter, Rejection, Reply}; + +/// GraphQL batch request filter +/// +/// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::BatchRequest`. +pub fn graphql_batch( + schema: Schema, +) -> impl Filter< + Extract = (( + Schema, + async_graphql::BatchRequest, + ),), + Error = Rejection, +> + Clone +where + Query: ObjectType + Send + Sync + 'static, + Mutation: ObjectType + Send + Sync + 'static, + Subscription: SubscriptionType + Send + Sync + 'static, +{ + graphql_batch_opts(schema, Default::default()) +} + +/// Similar to graphql_batch, but you can set the options `async_graphql::MultipartOptions`. +pub fn graphql_batch_opts( + schema: Schema, + opts: MultipartOptions, +) -> impl Filter< + Extract = (( + Schema, + async_graphql::BatchRequest, + ),), + Error = Rejection, +> + Clone +where + Query: ObjectType + Send + Sync + 'static, + Mutation: ObjectType + Send + Sync + 'static, + Subscription: SubscriptionType + Send + Sync + 'static, +{ + let opts = Arc::new(opts); + warp::any() + .and(warp::header::optional::("content-type")) + .and(warp::body::stream()) + .and(warp::any().map(move || opts.clone())) + .and(warp::any().map(move || schema.clone())) + .and_then( + |content_type, body, opts: Arc, schema| async move { + let request = async_graphql::http::receive_batch_body( + content_type, + futures::TryStreamExt::map_err(body, |err| { + io::Error::new(ErrorKind::Other, err) + }) + .map_ok(|mut buf| Buf::to_bytes(&mut buf)) + .into_async_read(), + MultipartOptions::clone(&opts), + ) + .await + .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; + Ok::<_, Rejection>((schema, request)) + }, + ) +} + +/// Reply for `async_graphql::BatchRequest`. +pub struct BatchResponse(async_graphql::BatchResponse); + +impl From for BatchResponse { + fn from(resp: async_graphql::BatchResponse) -> Self { + BatchResponse(resp) + } +} + +fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::BatchResponse) { + if resp.is_ok() { + if let Some(cache_control) = resp.cache_control().value() { + if let Ok(value) = cache_control.parse() { + http_resp.headers_mut().insert("cache-control", value); + } + } + } +} + +impl Reply for BatchResponse { + fn into_response(self) -> WarpResponse { + let mut resp = warp::reply::with_header( + warp::reply::json(&self.0), + "content-type", + "application/json", + ) + .into_response(); + add_cache_control(&mut resp, &self.0); + resp + } +} diff --git a/integrations/warp/src/error.rs b/integrations/warp/src/error.rs new file mode 100644 index 00000000..6b540c7f --- /dev/null +++ b/integrations/warp/src/error.rs @@ -0,0 +1,14 @@ +use warp::reject::Reject; + +/// Bad request error +/// +/// It's a wrapper of `async_graphql::ParseRequestError`. +pub struct BadRequest(pub anyhow::Error); + +impl std::fmt::Debug for BadRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Reject for BadRequest {} diff --git a/integrations/warp/src/lib.rs b/integrations/warp/src/lib.rs index 21f3e169..22d9ea18 100644 --- a/integrations/warp/src/lib.rs +++ b/integrations/warp/src/lib.rs @@ -1,258 +1,15 @@ //! Async-graphql integration with Warp -#![warn(missing_docs)] #![allow(clippy::type_complexity)] #![allow(clippy::needless_doctest_main)] #![forbid(unsafe_code)] -use async_graphql::http::MultipartOptions; -use async_graphql::{ - resolver_utils::ObjectType, Data, FieldResult, Request, Schema, SubscriptionType, -}; -use futures::{future, StreamExt, TryStreamExt}; -use hyper::Method; -use std::io::{self, ErrorKind}; -use std::sync::Arc; -use warp::filters::ws; -use warp::reject::Reject; -use warp::reply::Response as WarpResponse; -use warp::{Buf, Filter, Rejection, Reply}; +mod batch_request; +mod error; +mod request; +mod subscription; -/// Bad request error -/// -/// It's a wrapper of `async_graphql::ParseRequestError`. -pub struct BadRequest(pub anyhow::Error); - -impl std::fmt::Debug for BadRequest { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Reject for BadRequest {} - -/// GraphQL request filter -/// -/// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::Request`. -/// -/// # Examples -/// -/// *[Full Example]()* -/// -/// ```no_run -/// -/// use async_graphql::*; -/// use async_graphql_warp::*; -/// use warp::Filter; -/// use std::convert::Infallible; -/// -/// struct QueryRoot; -/// -/// #[Object] -/// impl QueryRoot { -/// #[field] -/// async fn value(&self, ctx: &Context<'_>) -> i32 { -/// unimplemented!() -/// } -/// } -/// -/// type MySchema = Schema; -/// -/// #[tokio::main] -/// async fn main() { -/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); -/// let filter = async_graphql_warp::graphql(schema). -/// and_then(|(schema, request): (MySchema, async_graphql::Request)| async move { -/// Ok::<_, Infallible>(async_graphql_warp::Response::from(schema.execute(request).await)) -/// }); -/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await; -/// } -/// ``` -pub fn graphql( - schema: Schema, -) -> impl Filter< - Extract = (( - Schema, - async_graphql::Request, - ),), - Error = Rejection, -> + Clone -where - Query: ObjectType + Send + Sync + 'static, - Mutation: ObjectType + Send + Sync + 'static, - Subscription: SubscriptionType + Send + Sync + 'static, -{ - graphql_opts(schema, Default::default()) -} - -/// Similar to graphql, but you can set the options `async_graphql::MultipartOptions`. -pub fn graphql_opts( - schema: Schema, - opts: MultipartOptions, -) -> impl Filter< - Extract = (( - Schema, - async_graphql::Request, - ),), - Error = Rejection, -> + Clone -where - Query: ObjectType + Send + Sync + 'static, - Mutation: ObjectType + Send + Sync + 'static, - Subscription: SubscriptionType + Send + Sync + 'static, -{ - let opts = Arc::new(opts); - warp::any() - .and(warp::method()) - .and(warp::query::raw().or(warp::any().map(String::new)).unify()) - .and(warp::header::optional::("content-type")) - .and(warp::body::stream()) - .and(warp::any().map(move || opts.clone())) - .and(warp::any().map(move || schema.clone())) - .and_then( - |method, - query: String, - content_type, - body, - opts: Arc, - schema| async move { - if method == Method::GET { - let request: Request = serde_urlencoded::from_str(&query) - .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; - Ok::<_, Rejection>((schema, request)) - } else { - let request = async_graphql::http::receive_body( - content_type, - futures::TryStreamExt::map_err(body, |err| io::Error::new(ErrorKind::Other, err)) - .map_ok(|mut buf| Buf::to_bytes(&mut buf)) - .into_async_read(), - MultipartOptions::clone(&opts), - ) - .await - .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; - Ok::<_, Rejection>((schema, request)) - } - }, - ) -} - -/// GraphQL subscription filter -/// -/// # Examples -/// -/// ```no_run -/// use async_graphql::*; -/// use async_graphql_warp::*; -/// use warp::Filter; -/// use futures::{Stream, StreamExt}; -/// use std::time::Duration; -/// -/// struct QueryRoot; -/// -/// #[Object] -/// impl QueryRoot {} -/// -/// struct SubscriptionRoot; -/// -/// #[Subscription] -/// impl SubscriptionRoot { -/// #[field] -/// async fn tick(&self) -> impl Stream { -/// tokio::time::interval(Duration::from_secs(1)).map(|n| format!("{}", n.elapsed().as_secs_f32())) -/// } -/// } -/// -/// #[tokio::main] -/// async fn main() { -/// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); -/// let filter = async_graphql_warp::graphql_subscription(schema) -/// .or(warp::any().map(|| "Hello, World!")); -/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await; -/// } -/// ``` -pub fn graphql_subscription( - schema: Schema, -) -> impl Filter + Clone -where - Query: ObjectType + Sync + Send + 'static, - Mutation: ObjectType + Sync + Send + 'static, - Subscription: SubscriptionType + Send + Sync + 'static, -{ - graphql_subscription_with_data::<_, _, _, fn(serde_json::Value) -> FieldResult>( - schema, None, - ) -} - -/// GraphQL subscription filter -/// -/// Specifies that a function converts the init payload to data. -pub fn graphql_subscription_with_data( - schema: Schema, - initializer: Option, -) -> impl Filter + Clone -where - Query: ObjectType + Sync + Send + 'static, - Mutation: ObjectType + Sync + Send + 'static, - Subscription: SubscriptionType + Send + Sync + 'static, - F: FnOnce(serde_json::Value) -> FieldResult + Send + Sync + Clone + 'static, -{ - warp::any() - .and(warp::ws()) - .and(warp::any().map(move || schema.clone())) - .and(warp::any().map(move || initializer.clone())) - .map( - |ws: ws::Ws, schema: Schema, initializer: Option| { - ws.on_upgrade(move |websocket| { - let (ws_sender, ws_receiver) = websocket.split(); - - async move { - let _ = async_graphql::http::WebSocket::with_data( - schema, - ws_receiver - .take_while(|msg| future::ready(msg.is_ok())) - .map(Result::unwrap) - .map(ws::Message::into_bytes), - initializer, - ) - .map(ws::Message::text) - .map(Ok) - .forward(ws_sender) - .await; - } - }) - }, - ) - .map(|reply| warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")) -} - -/// GraphQL reply -pub struct Response(async_graphql::Response); - -impl From for Response { - fn from(resp: async_graphql::Response) -> Self { - Response(resp) - } -} - -fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::Response) { - if resp.is_ok() { - if let Some(cache_control) = resp.cache_control.value() { - if let Ok(value) = cache_control.parse() { - http_resp.headers_mut().insert("cache-control", value); - } - } - } -} - -impl Reply for Response { - fn into_response(self) -> WarpResponse { - let mut resp = warp::reply::with_header( - warp::reply::json(&self.0), - "content-type", - "application/json", - ) - .into_response(); - add_cache_control(&mut resp, &self.0); - resp - } -} +pub use batch_request::{graphql_batch, graphql_batch_opts, BatchResponse}; +pub use error::BadRequest; +pub use request::{graphql, graphql_opts, Response}; +pub use subscription::{graphql_subscription, graphql_subscription_with_data}; diff --git a/integrations/warp/src/request.rs b/integrations/warp/src/request.rs new file mode 100644 index 00000000..ce38c851 --- /dev/null +++ b/integrations/warp/src/request.rs @@ -0,0 +1,147 @@ +use crate::BadRequest; +use async_graphql::http::MultipartOptions; +use async_graphql::{ObjectType, Schema, SubscriptionType}; +use futures::TryStreamExt; +use std::io; +use std::io::ErrorKind; +use std::sync::Arc; +use warp::http::Method; +use warp::reply::Response as WarpResponse; +use warp::{Buf, Filter, Rejection, Reply}; + +/// GraphQL request filter +/// +/// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::Request`. +/// +/// # Examples +/// +/// *[Full Example]()* +/// +/// ```no_run +/// +/// use async_graphql::*; +/// use async_graphql_warp::*; +/// use warp::Filter; +/// use std::convert::Infallible; +/// +/// struct QueryRoot; +/// +/// #[Object] +/// impl QueryRoot { +/// #[field] +/// async fn value(&self, ctx: &Context<'_>) -> i32 { +/// unimplemented!() +/// } +/// } +/// +/// type MySchema = Schema; +/// +/// #[tokio::main] +/// async fn main() { +/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); +/// let filter = async_graphql_warp::graphql(schema). +/// and_then(|(schema, request): (MySchema, async_graphql::Request)| async move { +/// Ok::<_, Infallible>(async_graphql_warp::Response::from(schema.execute(request).await)) +/// }); +/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await; +/// } +/// ``` +pub fn graphql( + schema: Schema, +) -> impl Filter< + Extract = (( + Schema, + async_graphql::Request, + ),), + Error = Rejection, +> + Clone +where + Query: ObjectType + Send + Sync + 'static, + Mutation: ObjectType + Send + Sync + 'static, + Subscription: SubscriptionType + Send + Sync + 'static, +{ + graphql_opts(schema, Default::default()) +} + +/// Similar to graphql, but you can set the options `async_graphql::MultipartOptions`. +pub fn graphql_opts( + schema: Schema, + opts: MultipartOptions, +) -> impl Filter< + Extract = (( + Schema, + async_graphql::Request, + ),), + Error = Rejection, +> + Clone +where + Query: ObjectType + Send + Sync + 'static, + Mutation: ObjectType + Send + Sync + 'static, + Subscription: SubscriptionType + Send + Sync + 'static, +{ + let opts = Arc::new(opts); + warp::any() + .and(warp::method()) + .and(warp::query::raw().or(warp::any().map(String::new)).unify()) + .and(warp::header::optional::("content-type")) + .and(warp::body::stream()) + .and(warp::any().map(move || opts.clone())) + .and(warp::any().map(move || schema.clone())) + .and_then( + |method, + query: String, + content_type, + body, + opts: Arc, + schema| async move { + if method == Method::GET { + let request: async_graphql::Request = serde_urlencoded::from_str(&query) + .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; + Ok::<_, Rejection>((schema, request)) + } else { + let request = async_graphql::http::receive_body( + content_type, + futures::TryStreamExt::map_err(body, |err| io::Error::new(ErrorKind::Other, err)) + .map_ok(|mut buf| Buf::to_bytes(&mut buf)) + .into_async_read(), + MultipartOptions::clone(&opts), + ) + .await + .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; + Ok::<_, Rejection>((schema, request)) + } + }, + ) +} + +/// Reply for `async_graphql::Request`. +pub struct Response(async_graphql::Response); + +impl From for Response { + fn from(resp: async_graphql::Response) -> Self { + Response(resp) + } +} + +fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::Response) { + if resp.is_ok() { + if let Some(cache_control) = resp.cache_control.value() { + if let Ok(value) = cache_control.parse() { + http_resp.headers_mut().insert("cache-control", value); + } + } + } +} + +impl Reply for Response { + fn into_response(self) -> WarpResponse { + let mut resp = warp::reply::with_header( + warp::reply::json(&self.0), + "content-type", + "application/json", + ) + .into_response(); + add_cache_control(&mut resp, &self.0); + resp + } +} diff --git a/integrations/warp/src/subscription.rs b/integrations/warp/src/subscription.rs new file mode 100644 index 00000000..1cf4fce8 --- /dev/null +++ b/integrations/warp/src/subscription.rs @@ -0,0 +1,93 @@ +use async_graphql::{resolver_utils::ObjectType, Data, FieldResult, Schema, SubscriptionType}; +use futures::{future, StreamExt}; +use warp::filters::ws; +use warp::{Filter, Rejection, Reply}; + +/// GraphQL subscription filter +/// +/// # Examples +/// +/// ```no_run +/// use async_graphql::*; +/// use async_graphql_warp::*; +/// use warp::Filter; +/// use futures::{Stream, StreamExt}; +/// use std::time::Duration; +/// +/// struct QueryRoot; +/// +/// #[Object] +/// impl QueryRoot {} +/// +/// struct SubscriptionRoot; +/// +/// #[Subscription] +/// impl SubscriptionRoot { +/// #[field] +/// async fn tick(&self) -> impl Stream { +/// tokio::time::interval(Duration::from_secs(1)).map(|n| format!("{}", n.elapsed().as_secs_f32())) +/// } +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); +/// let filter = async_graphql_warp::graphql_subscription(schema) +/// .or(warp::any().map(|| "Hello, World!")); +/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await; +/// } +/// ``` +pub fn graphql_subscription( + schema: Schema, +) -> impl Filter + Clone +where + Query: ObjectType + Sync + Send + 'static, + Mutation: ObjectType + Sync + Send + 'static, + Subscription: SubscriptionType + Send + Sync + 'static, +{ + graphql_subscription_with_data::<_, _, _, fn(serde_json::Value) -> FieldResult>( + schema, None, + ) +} + +/// GraphQL subscription filter +/// +/// Specifies that a function converts the init payload to data. +pub fn graphql_subscription_with_data( + schema: Schema, + initializer: Option, +) -> impl Filter + Clone +where + Query: ObjectType + Sync + Send + 'static, + Mutation: ObjectType + Sync + Send + 'static, + Subscription: SubscriptionType + Send + Sync + 'static, + F: FnOnce(serde_json::Value) -> FieldResult + Send + Sync + Clone + 'static, +{ + warp::any() + .and(warp::ws()) + .and(warp::any().map(move || schema.clone())) + .and(warp::any().map(move || initializer.clone())) + .map( + |ws: ws::Ws, schema: Schema, initializer: Option| { + ws.on_upgrade(move |websocket| { + let (ws_sender, ws_receiver) = websocket.split(); + + async move { + let _ = async_graphql::http::WebSocket::with_data( + schema, + ws_receiver + .take_while(|msg| future::ready(msg.is_ok())) + .map(Result::unwrap) + .map(ws::Message::into_bytes), + initializer, + ) + .map(ws::Message::text) + .map(Ok) + .forward(ws_sender) + .await; + } + }) + }, + ) + .map(|reply| warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")) +} diff --git a/src/types/maybe_undefined.rs b/src/types/maybe_undefined.rs index b3e4a9d2..68c2ad18 100644 --- a/src/types/maybe_undefined.rs +++ b/src/types/maybe_undefined.rs @@ -1,5 +1,5 @@ use crate::{registry, InputValueResult, InputValueType, Type, Value}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use std::borrow::Cow; /// Similar to `Option`, but it has three states, `undefined`, `null` and `x`.