Add BatchRequest support for actixweb and warp integrations.

This commit is contained in:
Sunli 2020-09-21 14:57:33 +08:00
parent 92c5db674a
commit e12ae8b236
9 changed files with 577 additions and 362 deletions

View File

@ -0,0 +1,100 @@
use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream};
use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::MultipartOptions;
use async_graphql::ParseRequestError;
use futures::channel::mpsc;
use futures::future::Ready;
use futures::io::ErrorKind;
use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use std::io;
use std::pin::Pin;
/// Extractor for GraphQL batch request
///
/// It's a wrapper of `async_graphql::Request`, you can use `Request::into_inner` unwrap it to `async_graphql::Request`.
/// `async_graphql::http::MultipartOptions` allows to configure extraction process.
pub struct BatchRequest(async_graphql::BatchRequest);
impl BatchRequest {
/// Unwraps the value to `async_graphql::Request`.
pub fn into_inner(self) -> async_graphql::BatchRequest {
self.0
}
}
impl FromRequest for BatchRequest {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<BatchRequest, Error>>>>;
type Config = MultipartOptions;
fn from_request(req: &HttpRequest, payload: &mut Payload<PayloadStream>) -> Self::Future {
let config = req.app_data::<Self::Config>().cloned().unwrap_or_default();
let content_type = req
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string());
let (mut tx, rx) = mpsc::channel(16);
// Because Payload is !Send, so forward it to mpsc::Sender
let mut payload = web::Payload(payload.take());
actix_rt::spawn(async move {
while let Some(item) = payload.next().await {
if tx.send(item).await.is_err() {
return;
}
}
});
Box::pin(async move {
Ok(BatchRequest(
async_graphql::http::receive_batch_body(
content_type,
rx.map_err(|err| io::Error::new(ErrorKind::Other, err))
.into_async_read(),
config,
)
.map_err(|err| match err {
ParseRequestError::PayloadTooLarge => {
actix_web::error::ErrorPayloadTooLarge(err)
}
_ => actix_web::error::ErrorBadRequest(err),
})
.await?,
))
})
}
}
/// Responder for GraphQL batch response
pub struct BatchResponse(async_graphql::BatchResponse);
impl From<async_graphql::BatchResponse> for BatchResponse {
fn from(resp: async_graphql::BatchResponse) -> Self {
BatchResponse(resp)
}
}
impl Responder for BatchResponse {
type Error = Error;
type Future = Ready<Result<HttpResponse, Error>>;
fn respond_to(self, _req: &HttpRequest) -> Self::Future {
let mut res = HttpResponse::build(StatusCode::OK);
res.content_type("application/json");
add_cache_control(&mut res, &self.0);
let res = res.body(serde_json::to_string(&self.0).unwrap());
futures::future::ok(res)
}
}
fn add_cache_control(builder: &mut HttpResponseBuilder, resp: &async_graphql::BatchResponse) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control().value() {
builder.header("cache-control", cache_control);
}
}
}

View File

@ -1,117 +1,11 @@
//! Async-graphql integration with Actix-web
#![warn(missing_docs)]
#![forbid(unsafe_code)]
mod batch_request;
mod request;
mod subscription;
use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream};
use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::MultipartOptions;
use async_graphql::ParseRequestError;
use futures::channel::mpsc;
use futures::future::Ready;
use futures::io::ErrorKind;
use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use http::Method;
use std::io;
use std::pin::Pin;
pub use batch_request::{BatchRequest, BatchResponse};
pub use request::{Request, Response};
pub use subscription::WSSubscription;
/// Extractor for GraphQL request
///
/// It's a wrapper of `async_graphql::Request`, you can use `Request::into_inner` unwrap it to `async_graphql::Request`.
/// `async_graphql::http::MultipartOptions` allows to configure extraction process.
pub struct Request(async_graphql::Request);
impl Request {
/// Unwraps the value to `async_graphql::Request`.
pub fn into_inner(self) -> async_graphql::Request {
self.0
}
}
impl FromRequest for Request {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Request, Error>>>>;
type Config = MultipartOptions;
fn from_request(req: &HttpRequest, payload: &mut Payload<PayloadStream>) -> Self::Future {
let config = req.app_data::<Self::Config>().cloned().unwrap_or_default();
if req.method() == Method::GET {
let res = web::Query::<async_graphql::Request>::from_query(req.query_string());
Box::pin(async move {
let gql_request = res?;
Ok(Request(gql_request.into_inner()))
})
} else {
let content_type = req
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string());
let (mut tx, rx) = mpsc::channel(16);
// Because Payload is !Send, so forward it to mpsc::Sender
let mut payload = web::Payload(payload.take());
actix_rt::spawn(async move {
while let Some(item) = payload.next().await {
if tx.send(item).await.is_err() {
return;
}
}
});
Box::pin(async move {
Ok(Request(
async_graphql::http::receive_body(
content_type,
rx.map_err(|err| io::Error::new(ErrorKind::Other, err))
.into_async_read(),
config,
)
.map_err(|err| match err {
ParseRequestError::PayloadTooLarge => {
actix_web::error::ErrorPayloadTooLarge(err)
}
_ => actix_web::error::ErrorBadRequest(err),
})
.await?,
))
})
}
}
}
/// Responder for GraphQL response
pub struct Response(async_graphql::Response);
impl From<async_graphql::Response> for Response {
fn from(resp: async_graphql::Response) -> Self {
Response(resp)
}
}
impl Responder for Response {
type Error = Error;
type Future = Ready<Result<HttpResponse, Error>>;
fn respond_to(self, _req: &HttpRequest) -> Self::Future {
let mut res = HttpResponse::build(StatusCode::OK);
res.content_type("application/json");
add_cache_control(&mut res, &self.0);
let res = res.body(serde_json::to_string(&self.0).unwrap());
futures::future::ok(res)
}
}
fn add_cache_control(builder: &mut HttpResponseBuilder, resp: &async_graphql::Response) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control.value() {
builder.header("cache-control", cache_control);
}
}
}

View File

@ -0,0 +1,109 @@
use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream};
use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::MultipartOptions;
use async_graphql::ParseRequestError;
use futures::channel::mpsc;
use futures::future::Ready;
use futures::io::ErrorKind;
use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use http::Method;
use std::io;
use std::pin::Pin;
/// Extractor for GraphQL request
///
/// It's a wrapper of `async_graphql::Request`, you can use `Request::into_inner` unwrap it to `async_graphql::Request`.
/// `async_graphql::http::MultipartOptions` allows to configure extraction process.
pub struct Request(async_graphql::Request);
impl Request {
/// Unwraps the value to `async_graphql::Request`.
pub fn into_inner(self) -> async_graphql::Request {
self.0
}
}
impl FromRequest for Request {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Request, Error>>>>;
type Config = MultipartOptions;
fn from_request(req: &HttpRequest, payload: &mut Payload<PayloadStream>) -> Self::Future {
let config = req.app_data::<Self::Config>().cloned().unwrap_or_default();
if req.method() == Method::GET {
let res = web::Query::<async_graphql::Request>::from_query(req.query_string());
Box::pin(async move {
let gql_request = res?;
Ok(Request(gql_request.into_inner()))
})
} else {
let content_type = req
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string());
let (mut tx, rx) = mpsc::channel(16);
// Because Payload is !Send, so forward it to mpsc::Sender
let mut payload = web::Payload(payload.take());
actix_rt::spawn(async move {
while let Some(item) = payload.next().await {
if tx.send(item).await.is_err() {
return;
}
}
});
Box::pin(async move {
Ok(Request(
async_graphql::http::receive_body(
content_type,
rx.map_err(|err| io::Error::new(ErrorKind::Other, err))
.into_async_read(),
config,
)
.map_err(|err| match err {
ParseRequestError::PayloadTooLarge => {
actix_web::error::ErrorPayloadTooLarge(err)
}
_ => actix_web::error::ErrorBadRequest(err),
})
.await?,
))
})
}
}
}
/// Responder for GraphQL response
pub struct Response(async_graphql::Response);
impl From<async_graphql::Response> for Response {
fn from(resp: async_graphql::Response) -> Self {
Response(resp)
}
}
impl Responder for Response {
type Error = Error;
type Future = Ready<Result<HttpResponse, Error>>;
fn respond_to(self, _req: &HttpRequest) -> Self::Future {
let mut res = HttpResponse::build(StatusCode::OK);
res.content_type("application/json");
add_cache_control(&mut res, &self.0);
let res = res.body(serde_json::to_string(&self.0).unwrap());
futures::future::ok(res)
}
}
fn add_cache_control(builder: &mut HttpResponseBuilder, resp: &async_graphql::Response) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control.value() {
builder.header("cache-control", cache_control);
}
}
}

View File

@ -0,0 +1,101 @@
use crate::BadRequest;
use async_graphql::http::MultipartOptions;
use async_graphql::{ObjectType, Schema, SubscriptionType};
use futures::TryStreamExt;
use std::io;
use std::io::ErrorKind;
use std::sync::Arc;
use warp::reply::Response as WarpResponse;
use warp::{Buf, Filter, Rejection, Reply};
/// GraphQL batch request filter
///
/// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::BatchRequest`.
pub fn graphql_batch<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
) -> impl Filter<
Extract = ((
Schema<Query, Mutation, Subscription>,
async_graphql::BatchRequest,
),),
Error = Rejection,
> + Clone
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
graphql_batch_opts(schema, Default::default())
}
/// Similar to graphql_batch, but you can set the options `async_graphql::MultipartOptions`.
pub fn graphql_batch_opts<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
opts: MultipartOptions,
) -> impl Filter<
Extract = ((
Schema<Query, Mutation, Subscription>,
async_graphql::BatchRequest,
),),
Error = Rejection,
> + Clone
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let opts = Arc::new(opts);
warp::any()
.and(warp::header::optional::<String>("content-type"))
.and(warp::body::stream())
.and(warp::any().map(move || opts.clone()))
.and(warp::any().map(move || schema.clone()))
.and_then(
|content_type, body, opts: Arc<MultipartOptions>, schema| async move {
let request = async_graphql::http::receive_batch_body(
content_type,
futures::TryStreamExt::map_err(body, |err| {
io::Error::new(ErrorKind::Other, err)
})
.map_ok(|mut buf| Buf::to_bytes(&mut buf))
.into_async_read(),
MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request))
},
)
}
/// Reply for `async_graphql::BatchRequest`.
pub struct BatchResponse(async_graphql::BatchResponse);
impl From<async_graphql::BatchResponse> for BatchResponse {
fn from(resp: async_graphql::BatchResponse) -> Self {
BatchResponse(resp)
}
}
fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::BatchResponse) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control().value() {
if let Ok(value) = cache_control.parse() {
http_resp.headers_mut().insert("cache-control", value);
}
}
}
}
impl Reply for BatchResponse {
fn into_response(self) -> WarpResponse {
let mut resp = warp::reply::with_header(
warp::reply::json(&self.0),
"content-type",
"application/json",
)
.into_response();
add_cache_control(&mut resp, &self.0);
resp
}
}

View File

@ -0,0 +1,14 @@
use warp::reject::Reject;
/// Bad request error
///
/// It's a wrapper of `async_graphql::ParseRequestError`.
pub struct BadRequest(pub anyhow::Error);
impl std::fmt::Debug for BadRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Reject for BadRequest {}

View File

@ -1,258 +1,15 @@
//! Async-graphql integration with Warp
#![warn(missing_docs)]
#![allow(clippy::type_complexity)]
#![allow(clippy::needless_doctest_main)]
#![forbid(unsafe_code)]
use async_graphql::http::MultipartOptions;
use async_graphql::{
resolver_utils::ObjectType, Data, FieldResult, Request, Schema, SubscriptionType,
};
use futures::{future, StreamExt, TryStreamExt};
use hyper::Method;
use std::io::{self, ErrorKind};
use std::sync::Arc;
use warp::filters::ws;
use warp::reject::Reject;
use warp::reply::Response as WarpResponse;
use warp::{Buf, Filter, Rejection, Reply};
mod batch_request;
mod error;
mod request;
mod subscription;
/// Bad request error
///
/// It's a wrapper of `async_graphql::ParseRequestError`.
pub struct BadRequest(pub anyhow::Error);
impl std::fmt::Debug for BadRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Reject for BadRequest {}
/// GraphQL request filter
///
/// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::Request`.
///
/// # Examples
///
/// *[Full Example](<https://github.com/async-graphql/examples/blob/master/warp/starwars/src/main.rs>)*
///
/// ```no_run
///
/// use async_graphql::*;
/// use async_graphql_warp::*;
/// use warp::Filter;
/// use std::convert::Infallible;
///
/// struct QueryRoot;
///
/// #[Object]
/// impl QueryRoot {
/// #[field]
/// async fn value(&self, ctx: &Context<'_>) -> i32 {
/// unimplemented!()
/// }
/// }
///
/// type MySchema = Schema<QueryRoot, EmptyMutation, EmptySubscription>;
///
/// #[tokio::main]
/// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
/// let filter = async_graphql_warp::graphql(schema).
/// and_then(|(schema, request): (MySchema, async_graphql::Request)| async move {
/// Ok::<_, Infallible>(async_graphql_warp::Response::from(schema.execute(request).await))
/// });
/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
/// }
/// ```
pub fn graphql<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
) -> impl Filter<
Extract = ((
Schema<Query, Mutation, Subscription>,
async_graphql::Request,
),),
Error = Rejection,
> + Clone
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
graphql_opts(schema, Default::default())
}
/// Similar to graphql, but you can set the options `async_graphql::MultipartOptions`.
pub fn graphql_opts<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
opts: MultipartOptions,
) -> impl Filter<
Extract = ((
Schema<Query, Mutation, Subscription>,
async_graphql::Request,
),),
Error = Rejection,
> + Clone
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let opts = Arc::new(opts);
warp::any()
.and(warp::method())
.and(warp::query::raw().or(warp::any().map(String::new)).unify())
.and(warp::header::optional::<String>("content-type"))
.and(warp::body::stream())
.and(warp::any().map(move || opts.clone()))
.and(warp::any().map(move || schema.clone()))
.and_then(
|method,
query: String,
content_type,
body,
opts: Arc<MultipartOptions>,
schema| async move {
if method == Method::GET {
let request: Request = serde_urlencoded::from_str(&query)
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request))
} else {
let request = async_graphql::http::receive_body(
content_type,
futures::TryStreamExt::map_err(body, |err| io::Error::new(ErrorKind::Other, err))
.map_ok(|mut buf| Buf::to_bytes(&mut buf))
.into_async_read(),
MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request))
}
},
)
}
/// GraphQL subscription filter
///
/// # Examples
///
/// ```no_run
/// use async_graphql::*;
/// use async_graphql_warp::*;
/// use warp::Filter;
/// use futures::{Stream, StreamExt};
/// use std::time::Duration;
///
/// struct QueryRoot;
///
/// #[Object]
/// impl QueryRoot {}
///
/// struct SubscriptionRoot;
///
/// #[Subscription]
/// impl SubscriptionRoot {
/// #[field]
/// async fn tick(&self) -> impl Stream<Item = String> {
/// tokio::time::interval(Duration::from_secs(1)).map(|n| format!("{}", n.elapsed().as_secs_f32()))
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
/// let filter = async_graphql_warp::graphql_subscription(schema)
/// .or(warp::any().map(|| "Hello, World!"));
/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
/// }
/// ```
pub fn graphql_subscription<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
graphql_subscription_with_data::<_, _, _, fn(serde_json::Value) -> FieldResult<Data>>(
schema, None,
)
}
/// GraphQL subscription filter
///
/// Specifies that a function converts the init payload to data.
pub fn graphql_subscription_with_data<Query, Mutation, Subscription, F>(
schema: Schema<Query, Mutation, Subscription>,
initializer: Option<F>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
F: FnOnce(serde_json::Value) -> FieldResult<Data> + Send + Sync + Clone + 'static,
{
warp::any()
.and(warp::ws())
.and(warp::any().map(move || schema.clone()))
.and(warp::any().map(move || initializer.clone()))
.map(
|ws: ws::Ws, schema: Schema<Query, Mutation, Subscription>, initializer: Option<F>| {
ws.on_upgrade(move |websocket| {
let (ws_sender, ws_receiver) = websocket.split();
async move {
let _ = async_graphql::http::WebSocket::with_data(
schema,
ws_receiver
.take_while(|msg| future::ready(msg.is_ok()))
.map(Result::unwrap)
.map(ws::Message::into_bytes),
initializer,
)
.map(ws::Message::text)
.map(Ok)
.forward(ws_sender)
.await;
}
})
},
)
.map(|reply| warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws"))
}
/// GraphQL reply
pub struct Response(async_graphql::Response);
impl From<async_graphql::Response> for Response {
fn from(resp: async_graphql::Response) -> Self {
Response(resp)
}
}
fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::Response) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control.value() {
if let Ok(value) = cache_control.parse() {
http_resp.headers_mut().insert("cache-control", value);
}
}
}
}
impl Reply for Response {
fn into_response(self) -> WarpResponse {
let mut resp = warp::reply::with_header(
warp::reply::json(&self.0),
"content-type",
"application/json",
)
.into_response();
add_cache_control(&mut resp, &self.0);
resp
}
}
pub use batch_request::{graphql_batch, graphql_batch_opts, BatchResponse};
pub use error::BadRequest;
pub use request::{graphql, graphql_opts, Response};
pub use subscription::{graphql_subscription, graphql_subscription_with_data};

View File

@ -0,0 +1,147 @@
use crate::BadRequest;
use async_graphql::http::MultipartOptions;
use async_graphql::{ObjectType, Schema, SubscriptionType};
use futures::TryStreamExt;
use std::io;
use std::io::ErrorKind;
use std::sync::Arc;
use warp::http::Method;
use warp::reply::Response as WarpResponse;
use warp::{Buf, Filter, Rejection, Reply};
/// GraphQL request filter
///
/// It outputs a tuple containing the `async_graphql::Schema` and `async_graphql::Request`.
///
/// # Examples
///
/// *[Full Example](<https://github.com/async-graphql/examples/blob/master/warp/starwars/src/main.rs>)*
///
/// ```no_run
///
/// use async_graphql::*;
/// use async_graphql_warp::*;
/// use warp::Filter;
/// use std::convert::Infallible;
///
/// struct QueryRoot;
///
/// #[Object]
/// impl QueryRoot {
/// #[field]
/// async fn value(&self, ctx: &Context<'_>) -> i32 {
/// unimplemented!()
/// }
/// }
///
/// type MySchema = Schema<QueryRoot, EmptyMutation, EmptySubscription>;
///
/// #[tokio::main]
/// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
/// let filter = async_graphql_warp::graphql(schema).
/// and_then(|(schema, request): (MySchema, async_graphql::Request)| async move {
/// Ok::<_, Infallible>(async_graphql_warp::Response::from(schema.execute(request).await))
/// });
/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
/// }
/// ```
pub fn graphql<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
) -> impl Filter<
Extract = ((
Schema<Query, Mutation, Subscription>,
async_graphql::Request,
),),
Error = Rejection,
> + Clone
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
graphql_opts(schema, Default::default())
}
/// Similar to graphql, but you can set the options `async_graphql::MultipartOptions`.
pub fn graphql_opts<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
opts: MultipartOptions,
) -> impl Filter<
Extract = ((
Schema<Query, Mutation, Subscription>,
async_graphql::Request,
),),
Error = Rejection,
> + Clone
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let opts = Arc::new(opts);
warp::any()
.and(warp::method())
.and(warp::query::raw().or(warp::any().map(String::new)).unify())
.and(warp::header::optional::<String>("content-type"))
.and(warp::body::stream())
.and(warp::any().map(move || opts.clone()))
.and(warp::any().map(move || schema.clone()))
.and_then(
|method,
query: String,
content_type,
body,
opts: Arc<MultipartOptions>,
schema| async move {
if method == Method::GET {
let request: async_graphql::Request = serde_urlencoded::from_str(&query)
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request))
} else {
let request = async_graphql::http::receive_body(
content_type,
futures::TryStreamExt::map_err(body, |err| io::Error::new(ErrorKind::Other, err))
.map_ok(|mut buf| Buf::to_bytes(&mut buf))
.into_async_read(),
MultipartOptions::clone(&opts),
)
.await
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
Ok::<_, Rejection>((schema, request))
}
},
)
}
/// Reply for `async_graphql::Request`.
pub struct Response(async_graphql::Response);
impl From<async_graphql::Response> for Response {
fn from(resp: async_graphql::Response) -> Self {
Response(resp)
}
}
fn add_cache_control(http_resp: &mut WarpResponse, resp: &async_graphql::Response) {
if resp.is_ok() {
if let Some(cache_control) = resp.cache_control.value() {
if let Ok(value) = cache_control.parse() {
http_resp.headers_mut().insert("cache-control", value);
}
}
}
}
impl Reply for Response {
fn into_response(self) -> WarpResponse {
let mut resp = warp::reply::with_header(
warp::reply::json(&self.0),
"content-type",
"application/json",
)
.into_response();
add_cache_control(&mut resp, &self.0);
resp
}
}

View File

@ -0,0 +1,93 @@
use async_graphql::{resolver_utils::ObjectType, Data, FieldResult, Schema, SubscriptionType};
use futures::{future, StreamExt};
use warp::filters::ws;
use warp::{Filter, Rejection, Reply};
/// GraphQL subscription filter
///
/// # Examples
///
/// ```no_run
/// use async_graphql::*;
/// use async_graphql_warp::*;
/// use warp::Filter;
/// use futures::{Stream, StreamExt};
/// use std::time::Duration;
///
/// struct QueryRoot;
///
/// #[Object]
/// impl QueryRoot {}
///
/// struct SubscriptionRoot;
///
/// #[Subscription]
/// impl SubscriptionRoot {
/// #[field]
/// async fn tick(&self) -> impl Stream<Item = String> {
/// tokio::time::interval(Duration::from_secs(1)).map(|n| format!("{}", n.elapsed().as_secs_f32()))
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
/// let filter = async_graphql_warp::graphql_subscription(schema)
/// .or(warp::any().map(|| "Hello, World!"));
/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
/// }
/// ```
pub fn graphql_subscription<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
graphql_subscription_with_data::<_, _, _, fn(serde_json::Value) -> FieldResult<Data>>(
schema, None,
)
}
/// GraphQL subscription filter
///
/// Specifies that a function converts the init payload to data.
pub fn graphql_subscription_with_data<Query, Mutation, Subscription, F>(
schema: Schema<Query, Mutation, Subscription>,
initializer: Option<F>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
F: FnOnce(serde_json::Value) -> FieldResult<Data> + Send + Sync + Clone + 'static,
{
warp::any()
.and(warp::ws())
.and(warp::any().map(move || schema.clone()))
.and(warp::any().map(move || initializer.clone()))
.map(
|ws: ws::Ws, schema: Schema<Query, Mutation, Subscription>, initializer: Option<F>| {
ws.on_upgrade(move |websocket| {
let (ws_sender, ws_receiver) = websocket.split();
async move {
let _ = async_graphql::http::WebSocket::with_data(
schema,
ws_receiver
.take_while(|msg| future::ready(msg.is_ok()))
.map(Result::unwrap)
.map(ws::Message::into_bytes),
initializer,
)
.map(ws::Message::text)
.map(Ok)
.forward(ws_sender)
.await;
}
})
},
)
.map(|reply| warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws"))
}

View File

@ -1,5 +1,5 @@
use crate::{registry, InputValueResult, InputValueType, Type, Value};
use serde::{Deserialize, Serialize};
use serde::Serialize;
use std::borrow::Cow;
/// Similar to `Option`, but it has three states, `undefined`, `null` and `x`.