diff --git a/Cargo.toml b/Cargo.toml index e6e1ee52..0c50588c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ members = [ "value", "parser", "derive", + "integrations/poem", "integrations/actix-web", "integrations/rocket", "integrations/tide", diff --git a/README.md b/README.md index ea6d5352..9b332d5e 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ This crate uses `#![forbid(unsafe_code)]` to ensure everything is implemented in * Rustfmt friendly (Procedural Macro) * Custom scalars * Minimal overhead -* Easy integration (actix_web, tide, warp, rocket ...) +* Easy integration ([poem](https://crates.io/crates/poem), actix_web, tide, warp, rocket ...) * Upload files (Multipart request) * Subscriptions (WebSocket transport) * Custom extensions @@ -96,6 +96,7 @@ cd examples && cargo run --bin [name] ## Integrations +* Poem [async-graphql-poem](https://crates.io/crates/async-graphql-poem) * Actix-web [async-graphql-actix-web](https://crates.io/crates/async-graphql-actix-web) * Warp [async-graphql-warp](https://crates.io/crates/async-graphql-warp) * Tide [async-graphql-tide](https://crates.io/crates/async-graphql-tide) diff --git a/docs/en/src/integrations.md b/docs/en/src/integrations.md index 136f51b6..18932661 100644 --- a/docs/en/src/integrations.md +++ b/docs/en/src/integrations.md @@ -2,6 +2,7 @@ `Async-graphql` supports several common Rust web servers. +- Poem [async-graphql-poem](https://crates.io/crates/async-graphql-poem) - Actix-web [async-graphql-actix-web](https://crates.io/crates/async-graphql-actix-web) - Warp [async-graphql-warp](https://crates.io/crates/async-graphql-warp) - Tide [async-graphql-tide](https://crates.io/crates/async-graphql-tide) diff --git a/docs/zh-CN/src/integrations.md b/docs/zh-CN/src/integrations.md index 6048f9fb..3cc7f809 100644 --- a/docs/zh-CN/src/integrations.md +++ b/docs/zh-CN/src/integrations.md @@ -2,6 +2,7 @@ `Async-graphql`提供了对一些常用Web Server的集成支持。 +- Poem [async-graphql-poem](https://crates.io/crates/async-graphql-poem) - Actix-web [async-graphql-actix-web](https://crates.io/crates/async-graphql-actix-web) - Warp [async-graphql-warp](https://crates.io/crates/async-graphql-warp) - Tide [async-graphql-tide](https://crates.io/crates/async-graphql-tide) diff --git a/examples b/examples index 7e3304e4..57f825bb 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit 7e3304e406adaff8d28a025becd91867ecae199a +Subproject commit 57f825bb941305c74171e24861aab71ec502c19f diff --git a/integrations/README.md b/integrations/README.md index 390b98f3..f731fb51 100644 --- a/integrations/README.md +++ b/integrations/README.md @@ -40,7 +40,8 @@ they must all internally use the below functions. ## Integration Status -- Actix-web: Complete integration. -- Rocket: Missing websocket support (blocked on [support in Rocket itself](https://github.com/SergioBenitez/Rocket/issues/90)). -- Tide: Missing websocket support (blocked on [support in Tide itself](https://github.com/http-rs/tide/issues/67)). -- Warp: Complete integration. +- **Poem**: Complete integration. +- **Actix-web**: Complete integration. +- **Rocket**: Missing websocket support (blocked on [support in Rocket itself](https://github.com/SergioBenitez/Rocket/issues/90)). +- **Tide**: Complete integration. +- **Warp**: Complete integration. diff --git a/integrations/poem/Cargo.toml b/integrations/poem/Cargo.toml new file mode 100644 index 00000000..584aec06 --- /dev/null +++ b/integrations/poem/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "async-graphql-poem" +version = "2.9.11" +authors = ["sunli "] +edition = "2018" +description = "async-graphql for poem" +license = "MIT/Apache-2.0" +documentation = "https://docs.rs/async-graphql-poem/" +homepage = "https://github.com/async-graphql/async-graphql" +repository = "https://github.com/async-graphql/async-graphql" +keywords = ["futures", "async", "graphql", "poem"] +categories = ["network-programming", "asynchronous"] + + +[dependencies] +async-graphql = { path = "../..", version = "2.9.11" } + +poem = { version = "0.3.6", features = ["websocket"] } +futures-util = { version = "0.3.13", default-features = false } +serde_json = "1.0.66" +tokio-util = { version = "0.6.7", features = ["compat"] } diff --git a/integrations/poem/src/extractor.rs b/integrations/poem/src/extractor.rs new file mode 100644 index 00000000..fc9434c8 --- /dev/null +++ b/integrations/poem/src/extractor.rs @@ -0,0 +1,83 @@ +use async_graphql::http::MultipartOptions; +use poem::http::{header, Method}; +use poem::web::Query; +use poem::{async_trait, Error, FromRequest, Request, RequestBody, Result}; +use tokio_util::compat::TokioAsyncReadCompatExt; + +/// An extractor for GraphQL request. +/// +/// You can just use the extractor as in the example below, but I would recommend using +/// the [`GraphQL`](crate::GraphQL) endpoint because it is easier to integrate. +/// +/// # Example +/// +/// ``` +/// use poem::{handler, route, EndpointExt}; +/// use poem::web::{Json, Data}; +/// use poem::middleware::AddData; +/// use async_graphql_poem::GraphQLRequest; +/// use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema}; +/// +/// struct Query; +/// +/// #[Object] +/// impl Query { +/// async fn value(&self) -> i32 { +/// 100 +/// } +/// } +/// +/// type MySchema = Schema; +/// +/// #[handler(method = "get")] +/// async fn index(req: GraphQLRequest, schema: Data<&MySchema>) -> Json { +/// Json(schema.execute(req.0).await) +/// } +/// +/// let app = route().at("/", index.with(AddData::new(Schema::new(Query, EmptyMutation, EmptySubscription)))); +/// ``` +pub struct GraphQLRequest(pub async_graphql::Request); + +#[async_trait] +impl<'a> FromRequest<'a> for GraphQLRequest { + async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result { + Ok(GraphQLRequest( + GraphQLBatchRequest::from_request(req, body) + .await? + .0 + .into_single() + .map_err(Error::bad_request)?, + )) + } +} + +/// An extractor for GraphQL batch request. +pub struct GraphQLBatchRequest(pub async_graphql::BatchRequest); + +#[async_trait] +impl<'a> FromRequest<'a> for GraphQLBatchRequest { + async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result { + if req.method() == Method::GET { + let req = Query::from_request(req, body) + .await + .map_err(Error::bad_request)? + .0; + Ok(Self(async_graphql::BatchRequest::Single(req))) + } else { + let content_type = req + .headers() + .get(header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(ToString::to_string); + Ok(Self( + async_graphql::http::receive_batch_body( + content_type, + body.take()?.into_async_read().compat(), + MultipartOptions::default(), + ) + .await + .map_err(Error::bad_request)?, + )) + } + } +} diff --git a/integrations/poem/src/lib.rs b/integrations/poem/src/lib.rs new file mode 100644 index 00000000..0a140beb --- /dev/null +++ b/integrations/poem/src/lib.rs @@ -0,0 +1,11 @@ +//! Async-graphql integration with Poem +#![forbid(unsafe_code)] +#![warn(missing_docs)] + +mod extractor; +mod query; +mod subscription; + +pub use extractor::{GraphQLBatchRequest, GraphQLRequest}; +pub use query::GraphQL; +pub use subscription::GraphQLSubscription; diff --git a/integrations/poem/src/query.rs b/integrations/poem/src/query.rs new file mode 100644 index 00000000..223ce4d4 --- /dev/null +++ b/integrations/poem/src/query.rs @@ -0,0 +1,54 @@ +use async_graphql::{BatchResponse as GraphQLBatchResponse, ObjectType, Schema, SubscriptionType}; +use poem::web::Json; +use poem::{async_trait, Endpoint, FromRequest, Request, Result}; + +use crate::GraphQLBatchRequest; + +/// A GraphQL query endpoint. +/// +/// # Example +/// +/// ``` +/// use poem::route; +/// use async_graphql_poem::GraphQL; +/// use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema}; +/// +/// struct Query; +/// +/// #[Object] +/// impl Query { +/// async fn value(&self) -> i32 { +/// 100 +/// } +/// } +/// +/// type MySchema = Schema; +/// +/// let app = route().at("/", GraphQL::new(Schema::new(Query, EmptyMutation, EmptySubscription))); +/// ``` +pub struct GraphQL { + schema: Schema, +} + +impl GraphQL { + /// Create a GraphQL query endpoint. + pub fn new(schema: Schema) -> Self { + Self { schema } + } +} + +#[async_trait] +impl Endpoint for GraphQL +where + Query: ObjectType + 'static, + Mutation: ObjectType + 'static, + Subscription: SubscriptionType + 'static, +{ + type Output = Result>; + + async fn call(&self, req: Request) -> Self::Output { + let (req, mut body) = req.split(); + let req = GraphQLBatchRequest::from_request(&req, &mut body).await?; + Ok(Json(self.schema.execute_batch(req.0).await)) + } +} diff --git a/integrations/poem/src/subscription.rs b/integrations/poem/src/subscription.rs new file mode 100644 index 00000000..10e625c3 --- /dev/null +++ b/integrations/poem/src/subscription.rs @@ -0,0 +1,146 @@ +use std::str::FromStr; + +use async_graphql::http::{WebSocketProtocols, WsMessage, ALL_WEBSOCKET_PROTOCOLS}; +use async_graphql::{Data, ObjectType, Schema, SubscriptionType}; +use futures_util::future::{self, Ready}; +use futures_util::{Future, SinkExt, StreamExt}; +use poem::web::websocket::{Message, WebSocket}; +use poem::{http, Endpoint, FromRequest, IntoResponse, Request, Response, Result}; + +/// A GraphQL subscription endpoint. +/// +/// # Example +/// +/// ``` +/// use poem::route; +/// use async_graphql_poem::GraphQLSubscription; +/// use async_graphql::{EmptyMutation, Object, Schema, Subscription}; +/// use futures_util::{Stream, stream}; +/// +/// struct Query; +/// +/// #[Object] +/// impl Query { +/// async fn value(&self) -> i32 { +/// 100 +/// } +/// } +/// +/// struct Subscription; +/// +/// #[Subscription] +/// impl Subscription { +/// async fn values(&self) -> impl Stream { +/// stream::iter(vec![1, 2, 3, 4, 5]) +/// } +/// } +/// +/// type MySchema = Schema; +/// +/// let app = route().at("/ws", GraphQLSubscription::new(Schema::new(Query, EmptyMutation, Subscription))); +/// ``` +pub struct GraphQLSubscription { + schema: Schema, + initializer: F, +} + +impl + GraphQLSubscription< + Query, + Mutation, + Subscription, + fn(serde_json::Value) -> Ready>, + > +{ + /// Create a GraphQL subscription endpoint. + pub fn new(schema: Schema) -> Self { + Self { + schema, + initializer: |_| futures_util::future::ready(Ok(Default::default())), + } + } +} + +impl GraphQLSubscription { + /// With a data initialization function. + pub fn with_initializer( + self, + initializer: F2, + ) -> GraphQLSubscription + where + F2: FnOnce(serde_json::Value) -> R + Clone + Send + Sync + 'static, + R: Future> + Send + 'static, + { + GraphQLSubscription { + schema: self.schema, + initializer, + } + } +} + +#[poem::async_trait] +impl Endpoint + for GraphQLSubscription +where + Query: ObjectType + 'static, + Mutation: ObjectType + 'static, + Subscription: SubscriptionType + 'static, + F: FnOnce(serde_json::Value) -> R + Clone + Send + Sync + 'static, + R: Future> + Send + 'static, +{ + type Output = Result; + + async fn call(&self, req: Request) -> Self::Output { + let (req, mut body) = req.split(); + let websocket = WebSocket::from_request(&req, &mut body).await?; + let protocol = req + .headers() + .get(http::header::SEC_WEBSOCKET_PROTOCOL) + .and_then(|value| value.to_str().ok()) + .and_then(|protocols| { + protocols + .split(',') + .find_map(|p| WebSocketProtocols::from_str(p.trim()).ok()) + }) + .unwrap_or(WebSocketProtocols::SubscriptionsTransportWS); + let schema = self.schema.clone(); + let initializer = self.initializer.clone(); + + let resp = websocket + .protocols(ALL_WEBSOCKET_PROTOCOLS) + .on_upgrade(move |socket| async move { + let (mut sink, stream) = socket.split(); + + let stream = stream + .take_while(|res| future::ready(res.is_ok())) + .map(Result::unwrap) + .filter_map(|msg| { + if msg.is_text() || msg.is_binary() { + future::ready(Some(msg)) + } else { + future::ready(None) + } + }) + .map(Message::into_bytes) + .boxed(); + + let mut stream = async_graphql::http::WebSocket::with_data( + schema, + stream, + initializer, + protocol, + ) + .map(|msg| match msg { + WsMessage::Text(text) => Message::text(text), + WsMessage::Close(code, status) => Message::close_with(code, status), + }); + + while let Some(item) = stream.next().await { + let _ = sink.send(item).await; + } + }) + .into_response(); + + Ok(resp) + } +} diff --git a/src/lib.rs b/src/lib.rs index 963d658c..a94d7cdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,7 +43,7 @@ //! * Rustfmt friendly (Procedural Macro) //! * Custom scalars //! * Minimal overhead -//! * Easy integration (actix_web, tide, warp, rocket ...) +//! * Easy integration ([poem](https://crates.io/crates/poem), actix_web, tide, warp, rocket ...) //! * File upload (Multipart request) //! * Subscriptions (WebSocket transport) //! * Custom extensions