diff --git a/integrations/warp/Cargo.toml b/integrations/warp/Cargo.toml index b4137ba1..eb9ed958 100644 --- a/integrations/warp/Cargo.toml +++ b/integrations/warp/Cargo.toml @@ -4,7 +4,6 @@ version = "2.0.3" authors = ["sunli ", "Koxiaet"] edition = "2018" description = "async-graphql for warp" -publish = true license = "MIT/Apache-2.0" documentation = "https://docs.rs/async-graphql/" homepage = "https://github.com/async-graphql/async-graphql" @@ -14,13 +13,10 @@ categories = ["network-programming", "asynchronous"] [dependencies] async-graphql = { path = "../..", version = "=2.0.3" } -warp = { version = "0.2", default-features = false, features = ["websocket"] } -futures = "0.3.0" -bytes = "0.5.4" -serde_json = "1.0.48" -hyper = "0.13.6" -serde_urlencoded = "0.6.1" -anyhow = "1.0" + +warp = { version = "0.2.5", default-features = false, features = ["websocket"] } +futures-util = { version = "0.3.6", default-features = false } +serde_json = "1.0.59" [dev-dependencies] -tokio = { version = "0.2", features = ["macros"] } +tokio = { version = "0.2", default-features = false, features = ["macros"] } diff --git a/integrations/warp/src/batch_request.rs b/integrations/warp/src/batch_request.rs index 49ae56b2..2a89b240 100644 --- a/integrations/warp/src/batch_request.rs +++ b/integrations/warp/src/batch_request.rs @@ -1,25 +1,22 @@ -use crate::BadRequest; -use async_graphql::http::MultipartOptions; -use async_graphql::{ObjectType, Schema, SubscriptionType}; -use futures::TryStreamExt; +use std::convert::TryInto; use std::io; use std::io::ErrorKind; -use std::sync::Arc; + +use async_graphql::http::MultipartOptions; +use async_graphql::{BatchRequest, ObjectType, Schema, SubscriptionType}; +use futures_util::TryStreamExt; use warp::reply::Response as WarpResponse; use warp::{Buf, Filter, Rejection, Reply}; +use crate::BadRequest; + /// GraphQL batch request filter /// /// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::BatchRequest`. pub fn graphql_batch( schema: Schema, -) -> impl Filter< - Extract = (( - Schema, - async_graphql::BatchRequest, - ),), - Error = Rejection, -> + Clone +) -> impl Filter, BatchRequest),), Error = Rejection> + + Clone where Query: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static, @@ -28,48 +25,40 @@ where graphql_batch_opts(schema, Default::default()) } -/// Similar to graphql_batch, but you can set the options `async_graphql::MultipartOptions`. +/// Similar to graphql_batch, but you can set the options with :`async_graphql::MultipartOptions`. pub fn graphql_batch_opts( schema: Schema, opts: MultipartOptions, -) -> impl Filter< - Extract = (( - Schema, - async_graphql::BatchRequest, - ),), - Error = Rejection, -> + Clone +) -> impl Filter, BatchRequest),), Error = Rejection> + + Clone where Query: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static, { - let opts = Arc::new(opts); warp::any() - .and(warp::header::optional::("content-type")) - .and(warp::body::stream()) - .and(warp::any().map(move || opts.clone())) - .and(warp::any().map(move || schema.clone())) - .and_then( - |content_type, body, opts: Arc, schema| async move { - let request = async_graphql::http::receive_batch_body( + .and(warp::get().and(warp::query()).map(BatchRequest::Single)) + .or(warp::any() + .and(warp::header::optional::("content-type")) + .and(warp::body::stream()) + .and_then(move |content_type, body| async move { + async_graphql::http::receive_batch_body( content_type, - futures::TryStreamExt::map_err(body, |err| { - io::Error::new(ErrorKind::Other, err) - }) - .map_ok(|mut buf| Buf::to_bytes(&mut buf)) - .into_async_read(), - MultipartOptions::clone(&opts), + TryStreamExt::map_err(body, |e| io::Error::new(ErrorKind::Other, e)) + .map_ok(|mut buf| Buf::to_bytes(&mut buf)) + .into_async_read(), + opts, ) .await - .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; - Ok::<_, Rejection>((schema, request)) - }, - ) + .map_err(|e| warp::reject::custom(BadRequest(e))) + })) + .unify() + .map(move |res| (schema.clone(), res)) } /// Reply for `async_graphql::BatchRequest`. -pub struct BatchResponse(async_graphql::BatchResponse); +#[derive(Debug)] +pub struct BatchResponse(pub async_graphql::BatchResponse); impl From for BatchResponse { fn from(resp: async_graphql::BatchResponse) -> Self { @@ -77,16 +66,6 @@ impl From for BatchResponse { } } -fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::BatchResponse) { - if resp.is_ok() { - if let Some(cache_control) = resp.cache_control().value() { - if let Ok(value) = cache_control.parse() { - http_resp.headers_mut().insert("cache-control", value); - } - } - } -} - impl Reply for BatchResponse { fn into_response(self) -> WarpResponse { let mut resp = warp::reply::with_header( @@ -95,7 +74,14 @@ impl Reply for BatchResponse { "application/json", ) .into_response(); - add_cache_control(&mut resp, &self.0); + + if self.0.is_ok() { + if let Some(cache_control) = self.0.cache_control().value() { + resp.headers_mut() + .insert("cache-control", cache_control.try_into().unwrap()); + } + } + resp } } diff --git a/integrations/warp/src/error.rs b/integrations/warp/src/error.rs index 6b540c7f..bca9d4c0 100644 --- a/integrations/warp/src/error.rs +++ b/integrations/warp/src/error.rs @@ -1,14 +1,60 @@ +use std::error::Error; +use std::fmt::{self, Display, Formatter}; + +use async_graphql::ParseRequestError; +use warp::http::{Response, StatusCode}; +use warp::hyper::Body; use warp::reject::Reject; +use warp::Reply; -/// Bad request error +/// Bad request error. /// -/// It's a wrapper of `async_graphql::ParseRequestError`. -pub struct BadRequest(pub anyhow::Error); +/// It's a wrapper of `async_graphql::ParseRequestError`. It is also a `Reply` - by default it just +/// returns a response containing the error message in plain text. +#[derive(Debug)] +pub struct BadRequest(pub ParseRequestError); -impl std::fmt::Debug for BadRequest { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) +impl BadRequest { + /// Get the appropriate status code of the error. + #[must_use] + pub fn status(&self) -> StatusCode { + match self.0 { + ParseRequestError::PayloadTooLarge => StatusCode::PAYLOAD_TOO_LARGE, + _ => StatusCode::BAD_REQUEST, + } + } +} + +impl Display for BadRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl Error for BadRequest { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&self.0) } } impl Reject for BadRequest {} + +impl Reply for BadRequest { + fn into_response(self) -> Response { + Response::builder() + .status(self.status()) + .body(Body::from(self.0.to_string())) + .unwrap() + } +} + +impl From for BadRequest { + fn from(e: ParseRequestError) -> Self { + Self(e) + } +} +impl From for ParseRequestError { + fn from(e: BadRequest) -> Self { + e.0 + } +} diff --git a/integrations/warp/src/request.rs b/integrations/warp/src/request.rs index cc360fc2..090be577 100644 --- a/integrations/warp/src/request.rs +++ b/integrations/warp/src/request.rs @@ -1,13 +1,9 @@ -use crate::BadRequest; use async_graphql::http::MultipartOptions; -use async_graphql::{ObjectType, Schema, SubscriptionType}; -use futures::TryStreamExt; -use std::io; -use std::io::ErrorKind; -use std::sync::Arc; -use warp::http::Method; +use async_graphql::{BatchRequest, ObjectType, Request, Schema, SubscriptionType}; use warp::reply::Response as WarpResponse; -use warp::{Buf, Filter, Rejection, Reply}; +use warp::{Filter, Rejection, Reply}; + +use crate::{graphql_batch_opts, BadRequest, BatchResponse}; /// GraphQL request filter /// @@ -38,10 +34,10 @@ use warp::{Buf, Filter, Rejection, Reply}; /// #[tokio::main] /// async fn main() { /// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); -/// let filter = async_graphql_warp::graphql(schema). -/// and_then(|(schema, request): (MySchema, async_graphql::Request)| async move { -/// Ok::<_, Infallible>(async_graphql_warp::Response::from(schema.execute(request).await)) -/// }); +/// let filter = async_graphql_warp::graphql(schema) +/// .and_then(|(schema, request): (MySchema, async_graphql::Request)| async move { +/// Ok::<_, Infallible>(async_graphql_warp::Response::from(schema.execute(request).await)) +/// }); /// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await; /// } /// ``` @@ -66,55 +62,25 @@ where pub fn graphql_opts( schema: Schema, opts: MultipartOptions, -) -> impl Filter< - Extract = (( - Schema, - async_graphql::Request, - ),), - Error = Rejection, -> + Clone +) -> impl Filter, Request),), Error = Rejection> + Clone where Query: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static, { - let opts = Arc::new(opts); - warp::any() - .and(warp::method()) - .and(warp::query::raw().or(warp::any().map(String::new)).unify()) - .and(warp::header::optional::("content-type")) - .and(warp::body::stream()) - .and(warp::any().map(move || opts.clone())) - .and(warp::any().map(move || schema.clone())) - .and_then( - |method, - query: String, - content_type, - body, - opts: Arc, - schema| async move { - if method == Method::GET { - let request: async_graphql::Request = serde_urlencoded::from_str(&query) - .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; - Ok::<_, Rejection>((schema, request)) - } else { - let request = async_graphql::http::receive_body( - content_type, - futures::TryStreamExt::map_err(body, |err| io::Error::new(ErrorKind::Other, err)) - .map_ok(|mut buf| Buf::to_bytes(&mut buf)) - .into_async_read(), - MultipartOptions::clone(&opts), - ) - .await - .map_err(|err| warp::reject::custom(BadRequest(err.into())))?; - Ok::<_, Rejection>((schema, request)) - } - }, - ) + graphql_batch_opts(schema, opts).and_then(|(schema, batch): (_, BatchRequest)| async move { + >::Ok(( + schema, + batch + .into_single() + .map_err(|e| warp::reject::custom(BadRequest(e)))?, + )) + }) } /// Reply for `async_graphql::Request`. -pub struct Response(async_graphql::Response); +#[derive(Debug)] +pub struct Response(pub async_graphql::Response); impl From for Response { fn from(resp: async_graphql::Response) -> Self { @@ -122,25 +88,8 @@ impl From for Response { } } -fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::Response) { - if resp.is_ok() { - if let Some(cache_control) = resp.cache_control.value() { - if let Ok(value) = cache_control.parse() { - http_resp.headers_mut().insert("cache-control", value); - } - } - } -} - impl Reply for Response { fn into_response(self) -> WarpResponse { - let mut resp = warp::reply::with_header( - warp::reply::json(&self.0), - "content-type", - "application/json", - ) - .into_response(); - add_cache_control(&mut resp, &self.0); - resp + BatchResponse(self.0.into()).into_response() } } diff --git a/integrations/warp/src/subscription.rs b/integrations/warp/src/subscription.rs index ce13c904..479a14fa 100644 --- a/integrations/warp/src/subscription.rs +++ b/integrations/warp/src/subscription.rs @@ -1,5 +1,5 @@ use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType}; -use futures::{future, StreamExt}; +use futures_util::{future, StreamExt}; use warp::filters::ws; use warp::{Filter, Rejection, Reply}; @@ -60,31 +60,28 @@ where Subscription: SubscriptionType + Send + Sync + 'static, F: FnOnce(serde_json::Value) -> Result + Send + Sync + Clone + 'static, { - warp::any() - .and(warp::ws()) - .and(warp::any().map(move || schema.clone())) - .and(warp::any().map(move || initializer.clone())) - .map( - |ws: ws::Ws, schema: Schema, initializer: Option| { - ws.on_upgrade(move |websocket| { - let (ws_sender, ws_receiver) = websocket.split(); + warp::ws().map(move |ws: ws::Ws| { + let schema = schema.clone(); + let initializer = initializer.clone(); - async move { - let _ = async_graphql::http::WebSocket::with_data( - schema, - ws_receiver - .take_while(|msg| future::ready(msg.is_ok())) - .map(Result::unwrap) - .map(ws::Message::into_bytes), - initializer, - ) - .map(ws::Message::text) - .map(Ok) - .forward(ws_sender) - .await; - } - }) - }, - ) - .map(|reply| warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")) + let reply = ws.on_upgrade(move |websocket| { + let (ws_sender, ws_receiver) = websocket.split(); + + async move { + let _ = async_graphql::http::WebSocket::with_data( + schema, + ws_receiver + .take_while(|msg| future::ready(msg.is_ok())) + .map(Result::unwrap) + .map(ws::Message::into_bytes), + initializer, + ) + .map(ws::Message::text) + .map(Ok) + .forward(ws_sender) + .await; + } + }); + warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws") + }) }