2020-04-21 10:14:14 +08:00

210 lines
7.0 KiB

//! Async-graphql integration with Wrap
use async_graphql::http::StreamBody;
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, Schema, SubscriptionType,
use bytes::Bytes;
use futures::select;
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use warp::filters::ws::Message;
use warp::filters::BoxedFilter;
use warp::reject::Reject;
use warp::{Filter, Rejection, Reply};
/// Bad request error
/// It's a wrapper of `async_graphql::ParseRequestError`.
pub struct BadRequest(pub async_graphql::ParseRequestError);
impl std::fmt::Debug for BadRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
impl Reject for BadRequest {}
/// GraphQL request filter
/// It outputs a tuple containing the `Schema` and `QuertBuilder`.
/// # Examples
/// *[Full Example](<>)*
/// ```no_run
/// use async_graphql::*;
/// use warp::{Filter, Reply};
/// use std::convert::Infallible;
/// use async_graphql::http::GQLResponse;
/// struct QueryRoot;
/// #[Object]
/// impl QueryRoot {
/// #[field]
/// async fn value(&self, ctx: &Context<'_>) -> i32 {
/// unimplemented!()
/// }
/// }
/// #[tokio::main]
/// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
/// let filter = async_graphql_warp::graphql(schema).and_then(|(schema, builder): (_, QueryBuilder)| async move {
/// let resp = builder.execute(&schema).await;
/// Ok::<_, Infallible>(warp::reply::json(&GQLResponse(resp)).into_response())
/// });
/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
/// }
/// ```
pub fn graphql<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
) -> BoxedFilter<((Schema<Query, Mutation, Subscription>, QueryBuilder),)>
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
.and(warp::any().map(move || schema.clone()))
.and_then(|content_type, body, schema| async move {
let builder = (content_type, StreamBody::new(body))
.map_err(|err| warp::reject::custom(BadRequest(err)))?;
Ok::<_, Rejection>((schema, builder))
/// Similar to graphql, but you can set the options `IntoQueryBuilderOpts`.
pub fn graphql_opts<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
opts: IntoQueryBuilderOpts,
) -> BoxedFilter<((Schema<Query, Mutation, Subscription>, QueryBuilder),)>
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
let opts = Arc::new(opts);
.and(warp::any().map(move || opts.clone()))
.and(warp::any().map(move || schema.clone()))
|content_type, body, opts: Arc<IntoQueryBuilderOpts>, schema| async move {
let builder = (content_type, StreamBody::new(body))
.map_err(|err| warp::reject::custom(BadRequest(err)))?;
Ok::<_, Rejection>((schema, builder))
/// GraphQL subscription filter
/// # Examples
/// ```no_run
/// use async_graphql::*;
/// use warp::Filter;
/// use futures::{Stream, StreamExt};
/// use std::time::Duration;
/// struct QueryRoot;
/// #[Object]
/// impl QueryRoot {}
/// struct SubscriptionRoot;
/// #[Subscription]
/// impl SubscriptionRoot {
/// #[field]
/// async fn tick(&self) -> impl Stream<Item = String> {
/// tokio::time::interval(Duration::from_secs(1)).map(|n| format!("{}", n.elapsed().as_secs_f32()))
/// }
/// }
/// #[tokio::main]
/// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
/// let filter = async_graphql_warp::graphql_subscription(schema);
/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
/// }
/// ```
pub fn graphql_subscription<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
) -> BoxedFilter<(impl Reply,)>
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
.and(warp::any().map(move || schema.clone()))
|ws: warp::ws::Ws, schema: Schema<Query, Mutation, Subscription>| {
ws.on_upgrade(move |websocket| {
let (mut tx, rx) = websocket.split();
let (mut stx, srx) =
let mut rx = rx.fuse();
let mut srx = srx.fuse();
async move {
loop {
select! {
bytes = => {
if let Some(bytes) = bytes {
if tx
.send(Message::text(unsafe {
} else {
msg = => {
if let Some(Ok(msg)) = msg {
if msg.is_text() {
if stx.send(Bytes::copy_from_slice(msg.as_bytes())).await.is_err() {
).map(|reply| {
warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")