Improve Warp integration

This contains a breaking change; BadRequest now contains a
ParseRequestError instead of an anyhow::Error, so it's implementation is
what the documentation says. I'm not sure whether it's worth bumping
the major version number though.
This commit is contained in:
Koxiaet 2020-10-15 14:16:40 +01:00
parent 985ee939d4
commit 1a1e2b376f
5 changed files with 137 additions and 163 deletions

View File

@ -4,7 +4,6 @@ version = "2.0.3"
authors = ["sunli <scott_s829@163.com>", "Koxiaet"] authors = ["sunli <scott_s829@163.com>", "Koxiaet"]
edition = "2018" edition = "2018"
description = "async-graphql for warp" description = "async-graphql for warp"
publish = true
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
documentation = "https://docs.rs/async-graphql/" documentation = "https://docs.rs/async-graphql/"
homepage = "https://github.com/async-graphql/async-graphql" homepage = "https://github.com/async-graphql/async-graphql"
@ -14,13 +13,10 @@ categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
async-graphql = { path = "../..", version = "=2.0.3" } async-graphql = { path = "../..", version = "=2.0.3" }
warp = { version = "0.2", default-features = false, features = ["websocket"] }
futures = "0.3.0" warp = { version = "0.2.5", default-features = false, features = ["websocket"] }
bytes = "0.5.4" futures-util = { version = "0.3.6", default-features = false }
serde_json = "1.0.48" serde_json = "1.0.59"
hyper = "0.13.6"
serde_urlencoded = "0.6.1"
anyhow = "1.0"
[dev-dependencies] [dev-dependencies]
tokio = { version = "0.2", features = ["macros"] } tokio = { version = "0.2", default-features = false, features = ["macros"] }

View File

@ -1,25 +1,22 @@
use crate::BadRequest; use std::convert::TryInto;
use async_graphql::http::MultipartOptions;
use async_graphql::{ObjectType, Schema, SubscriptionType};
use futures::TryStreamExt;
use std::io; use std::io;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::sync::Arc;
use async_graphql::http::MultipartOptions;
use async_graphql::{BatchRequest, ObjectType, Schema, SubscriptionType};
use futures_util::TryStreamExt;
use warp::reply::Response as WarpResponse; use warp::reply::Response as WarpResponse;
use warp::{Buf, Filter, Rejection, Reply}; use warp::{Buf, Filter, Rejection, Reply};
use crate::BadRequest;
/// GraphQL batch request filter /// GraphQL batch request filter
/// ///
/// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::BatchRequest`. /// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::BatchRequest`.
pub fn graphql_batch<Query, Mutation, Subscription>( pub fn graphql_batch<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>, schema: Schema<Query, Mutation, Subscription>,
) -> impl Filter< ) -> impl Filter<Extract = ((Schema<Query, Mutation, Subscription>, BatchRequest),), Error = Rejection>
Extract = (( + Clone
Schema<Query, Mutation, Subscription>,
async_graphql::BatchRequest,
),),
Error = Rejection,
> + Clone
where where
Query: ObjectType + Send + Sync + 'static, Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static,
@ -28,48 +25,40 @@ where
graphql_batch_opts(schema, Default::default()) graphql_batch_opts(schema, Default::default())
} }
/// Similar to graphql_batch, but you can set the options `async_graphql::MultipartOptions`. /// Similar to graphql_batch, but you can set the options with :`async_graphql::MultipartOptions`.
pub fn graphql_batch_opts<Query, Mutation, Subscription>( pub fn graphql_batch_opts<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>, schema: Schema<Query, Mutation, Subscription>,
opts: MultipartOptions, opts: MultipartOptions,
) -> impl Filter< ) -> impl Filter<Extract = ((Schema<Query, Mutation, Subscription>, BatchRequest),), Error = Rejection>
Extract = (( + Clone
Schema<Query, Mutation, Subscription>,
async_graphql::BatchRequest,
),),
Error = Rejection,
> + Clone
where where
Query: ObjectType + Send + Sync + 'static, Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static,
{ {
let opts = Arc::new(opts);
warp::any() warp::any()
.and(warp::header::optional::<String>("content-type")) .and(warp::get().and(warp::query()).map(BatchRequest::Single))
.and(warp::body::stream()) .or(warp::any()
.and(warp::any().map(move || opts.clone())) .and(warp::header::optional::<String>("content-type"))
.and(warp::any().map(move || schema.clone())) .and(warp::body::stream())
.and_then( .and_then(move |content_type, body| async move {
|content_type, body, opts: Arc<MultipartOptions>, schema| async move { async_graphql::http::receive_batch_body(
let request = async_graphql::http::receive_batch_body(
content_type, content_type,
futures::TryStreamExt::map_err(body, |err| { TryStreamExt::map_err(body, |e| io::Error::new(ErrorKind::Other, e))
io::Error::new(ErrorKind::Other, err) .map_ok(|mut buf| Buf::to_bytes(&mut buf))
}) .into_async_read(),
.map_ok(|mut buf| Buf::to_bytes(&mut buf)) opts,
.into_async_read(),
MultipartOptions::clone(&opts),
) )
.await .await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?; .map_err(|e| warp::reject::custom(BadRequest(e)))
Ok::<_, Rejection>((schema, request)) }))
}, .unify()
) .map(move |res| (schema.clone(), res))
} }
/// Reply for `async_graphql::BatchRequest`. /// Reply for `async_graphql::BatchRequest`.
pub struct BatchResponse(async_graphql::BatchResponse); #[derive(Debug)]
pub struct BatchResponse(pub async_graphql::BatchResponse);
impl From<async_graphql::BatchResponse> for BatchResponse { impl From<async_graphql::BatchResponse> for BatchResponse {
fn from(resp: async_graphql::BatchResponse) -> Self { fn from(resp: async_graphql::BatchResponse) -> Self {
@ -77,16 +66,6 @@ impl From<async_graphql::BatchResponse> for BatchResponse {
} }
} }
fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::BatchResponse) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control().value() {
if let Ok(value) = cache_control.parse() {
http_resp.headers_mut().insert("cache-control", value);
}
}
}
}
impl Reply for BatchResponse { impl Reply for BatchResponse {
fn into_response(self) -> WarpResponse { fn into_response(self) -> WarpResponse {
let mut resp = warp::reply::with_header( let mut resp = warp::reply::with_header(
@ -95,7 +74,14 @@ impl Reply for BatchResponse {
"application/json", "application/json",
) )
.into_response(); .into_response();
add_cache_control(&mut resp, &self.0);
if self.0.is_ok() {
if let Some(cache_control) = self.0.cache_control().value() {
resp.headers_mut()
.insert("cache-control", cache_control.try_into().unwrap());
}
}
resp resp
} }
} }

View File

@ -1,14 +1,60 @@
use std::error::Error;
use std::fmt::{self, Display, Formatter};
use async_graphql::ParseRequestError;
use warp::http::{Response, StatusCode};
use warp::hyper::Body;
use warp::reject::Reject; use warp::reject::Reject;
use warp::Reply;
/// Bad request error /// Bad request error.
/// ///
/// It's a wrapper of `async_graphql::ParseRequestError`. /// It's a wrapper of `async_graphql::ParseRequestError`. It is also a `Reply` - by default it just
pub struct BadRequest(pub anyhow::Error); /// returns a response containing the error message in plain text.
#[derive(Debug)]
pub struct BadRequest(pub ParseRequestError);
impl std::fmt::Debug for BadRequest { impl BadRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { /// Get the appropriate status code of the error.
write!(f, "{}", self.0) #[must_use]
pub fn status(&self) -> StatusCode {
match self.0 {
ParseRequestError::PayloadTooLarge => StatusCode::PAYLOAD_TOO_LARGE,
_ => StatusCode::BAD_REQUEST,
}
}
}
impl Display for BadRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Error for BadRequest {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.0)
} }
} }
impl Reject for BadRequest {} impl Reject for BadRequest {}
impl Reply for BadRequest {
fn into_response(self) -> Response<Body> {
Response::builder()
.status(self.status())
.body(Body::from(self.0.to_string()))
.unwrap()
}
}
impl From<ParseRequestError> for BadRequest {
fn from(e: ParseRequestError) -> Self {
Self(e)
}
}
impl From<BadRequest> for ParseRequestError {
fn from(e: BadRequest) -> Self {
e.0
}
}

View File

@ -1,13 +1,9 @@
use crate::BadRequest;
use async_graphql::http::MultipartOptions; use async_graphql::http::MultipartOptions;
use async_graphql::{ObjectType, Schema, SubscriptionType}; use async_graphql::{BatchRequest, ObjectType, Request, Schema, SubscriptionType};
use futures::TryStreamExt;
use std::io;
use std::io::ErrorKind;
use std::sync::Arc;
use warp::http::Method;
use warp::reply::Response as WarpResponse; use warp::reply::Response as WarpResponse;
use warp::{Buf, Filter, Rejection, Reply}; use warp::{Filter, Rejection, Reply};
use crate::{graphql_batch_opts, BadRequest, BatchResponse};
/// GraphQL request filter /// GraphQL request filter
/// ///
@ -38,10 +34,10 @@ use warp::{Buf, Filter, Rejection, Reply};
/// #[tokio::main] /// #[tokio::main]
/// async fn main() { /// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); /// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
/// let filter = async_graphql_warp::graphql(schema). /// let filter = async_graphql_warp::graphql(schema)
/// and_then(|(schema, request): (MySchema, async_graphql::Request)| async move { /// .and_then(|(schema, request): (MySchema, async_graphql::Request)| async move {
/// Ok::<_, Infallible>(async_graphql_warp::Response::from(schema.execute(request).await)) /// Ok::<_, Infallible>(async_graphql_warp::Response::from(schema.execute(request).await))
/// }); /// });
/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await; /// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
/// } /// }
/// ``` /// ```
@ -66,55 +62,25 @@ where
pub fn graphql_opts<Query, Mutation, Subscription>( pub fn graphql_opts<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>, schema: Schema<Query, Mutation, Subscription>,
opts: MultipartOptions, opts: MultipartOptions,
) -> impl Filter< ) -> impl Filter<Extract = ((Schema<Query, Mutation, Subscription>, Request),), Error = Rejection> + Clone
Extract = ((
Schema<Query, Mutation, Subscription>,
async_graphql::Request,
),),
Error = Rejection,
> + Clone
where where
Query: ObjectType + Send + Sync + 'static, Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static,
{ {
let opts = Arc::new(opts); graphql_batch_opts(schema, opts).and_then(|(schema, batch): (_, BatchRequest)| async move {
warp::any() <Result<_, Rejection>>::Ok((
.and(warp::method()) schema,
.and(warp::query::raw().or(warp::any().map(String::new)).unify()) batch
.and(warp::header::optional::<String>("content-type")) .into_single()
.and(warp::body::stream()) .map_err(|e| warp::reject::custom(BadRequest(e)))?,
.and(warp::any().map(move || opts.clone())) ))
.and(warp::any().map(move || schema.clone())) })
.and_then(
|method,
query: String,
content_type,
body,
opts: Arc<MultipartOptions>,
schema| async move {
if method == Method::GET {
let request: async_graphql::Request = serde_urlencoded::from_str(&query)
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request))
} else {
let request = async_graphql::http::receive_body(
content_type,
futures::TryStreamExt::map_err(body, |err| io::Error::new(ErrorKind::Other, err))
.map_ok(|mut buf| Buf::to_bytes(&mut buf))
.into_async_read(),
MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request))
}
},
)
} }
/// Reply for `async_graphql::Request`. /// Reply for `async_graphql::Request`.
pub struct Response(async_graphql::Response); #[derive(Debug)]
pub struct Response(pub async_graphql::Response);
impl From<async_graphql::Response> for Response { impl From<async_graphql::Response> for Response {
fn from(resp: async_graphql::Response) -> Self { fn from(resp: async_graphql::Response) -> Self {
@ -122,25 +88,8 @@ impl From<async_graphql::Response> for Response {
} }
} }
fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::Response) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control.value() {
if let Ok(value) = cache_control.parse() {
http_resp.headers_mut().insert("cache-control", value);
}
}
}
}
impl Reply for Response { impl Reply for Response {
fn into_response(self) -> WarpResponse { fn into_response(self) -> WarpResponse {
let mut resp = warp::reply::with_header( BatchResponse(self.0.into()).into_response()
warp::reply::json(&self.0),
"content-type",
"application/json",
)
.into_response();
add_cache_control(&mut resp, &self.0);
resp
} }
} }

View File

@ -1,5 +1,5 @@
use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType}; use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType};
use futures::{future, StreamExt}; use futures_util::{future, StreamExt};
use warp::filters::ws; use warp::filters::ws;
use warp::{Filter, Rejection, Reply}; use warp::{Filter, Rejection, Reply};
@ -60,31 +60,28 @@ where
Subscription: SubscriptionType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static,
F: FnOnce(serde_json::Value) -> Result<Data> + Send + Sync + Clone + 'static, F: FnOnce(serde_json::Value) -> Result<Data> + Send + Sync + Clone + 'static,
{ {
warp::any() warp::ws().map(move |ws: ws::Ws| {
.and(warp::ws()) let schema = schema.clone();
.and(warp::any().map(move || schema.clone())) let initializer = initializer.clone();
.and(warp::any().map(move || initializer.clone()))
.map(
|ws: ws::Ws, schema: Schema<Query, Mutation, Subscription>, initializer: Option<F>| {
ws.on_upgrade(move |websocket| {
let (ws_sender, ws_receiver) = websocket.split();
async move { let reply = ws.on_upgrade(move |websocket| {
let _ = async_graphql::http::WebSocket::with_data( let (ws_sender, ws_receiver) = websocket.split();
schema,
ws_receiver async move {
.take_while(|msg| future::ready(msg.is_ok())) let _ = async_graphql::http::WebSocket::with_data(
.map(Result::unwrap) schema,
.map(ws::Message::into_bytes), ws_receiver
initializer, .take_while(|msg| future::ready(msg.is_ok()))
) .map(Result::unwrap)
.map(ws::Message::text) .map(ws::Message::into_bytes),
.map(Ok) initializer,
.forward(ws_sender) )
.await; .map(ws::Message::text)
} .map(Ok)
}) .forward(ws_sender)
}, .await;
) }
.map(|reply| warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")) });
warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")
})
} }