Add Poem integration.

This commit is contained in:
Sunli 2021-08-23 23:16:31 +08:00
parent 7962d1fd07
commit 853cf65546
12 changed files with 327 additions and 7 deletions

View File

@ -78,6 +78,7 @@ members = [
"value",
"parser",
"derive",
"integrations/poem",
"integrations/actix-web",
"integrations/rocket",
"integrations/tide",

View File

@ -47,7 +47,7 @@ This crate uses `#![forbid(unsafe_code)]` to ensure everything is implemented in
* Rustfmt friendly (Procedural Macro)
* Custom scalars
* Minimal overhead
* Easy integration (actix_web, tide, warp, rocket ...)
* Easy integration ([poem](https://crates.io/crates/poem), actix_web, tide, warp, rocket ...)
* Upload files (Multipart request)
* Subscriptions (WebSocket transport)
* Custom extensions
@ -96,6 +96,7 @@ cd examples && cargo run --bin [name]
## Integrations
* Poem [async-graphql-poem](https://crates.io/crates/async-graphql-poem)
* Actix-web [async-graphql-actix-web](https://crates.io/crates/async-graphql-actix-web)
* Warp [async-graphql-warp](https://crates.io/crates/async-graphql-warp)
* Tide [async-graphql-tide](https://crates.io/crates/async-graphql-tide)

View File

@ -2,6 +2,7 @@
`Async-graphql` supports several common Rust web servers.
- Poem [async-graphql-poem](https://crates.io/crates/async-graphql-poem)
- Actix-web [async-graphql-actix-web](https://crates.io/crates/async-graphql-actix-web)
- Warp [async-graphql-warp](https://crates.io/crates/async-graphql-warp)
- Tide [async-graphql-tide](https://crates.io/crates/async-graphql-tide)

View File

@ -2,6 +2,7 @@
`Async-graphql`提供了对一些常用Web Server的集成支持。
- Poem [async-graphql-poem](https://crates.io/crates/async-graphql-poem)
- Actix-web [async-graphql-actix-web](https://crates.io/crates/async-graphql-actix-web)
- Warp [async-graphql-warp](https://crates.io/crates/async-graphql-warp)
- Tide [async-graphql-tide](https://crates.io/crates/async-graphql-tide)

@ -1 +1 @@
Subproject commit 7e3304e406adaff8d28a025becd91867ecae199a
Subproject commit 57f825bb941305c74171e24861aab71ec502c19f

View File

@ -40,7 +40,8 @@ they must all internally use the below functions.
## Integration Status
- Actix-web: Complete integration.
- Rocket: Missing websocket support (blocked on [support in Rocket itself](https://github.com/SergioBenitez/Rocket/issues/90)).
- Tide: Missing websocket support (blocked on [support in Tide itself](https://github.com/http-rs/tide/issues/67)).
- Warp: Complete integration.
- **Poem**: Complete integration.
- **Actix-web**: Complete integration.
- **Rocket**: Missing websocket support (blocked on [support in Rocket itself](https://github.com/SergioBenitez/Rocket/issues/90)).
- **Tide**: Complete integration.
- **Warp**: Complete integration.

View File

@ -0,0 +1,21 @@
[package]
name = "async-graphql-poem"
version = "2.9.11"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "async-graphql for poem"
license = "MIT/Apache-2.0"
documentation = "https://docs.rs/async-graphql-poem/"
homepage = "https://github.com/async-graphql/async-graphql"
repository = "https://github.com/async-graphql/async-graphql"
keywords = ["futures", "async", "graphql", "poem"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-graphql = { path = "../..", version = "2.9.11" }
poem = { version = "0.3.6", features = ["websocket"] }
futures-util = { version = "0.3.13", default-features = false }
serde_json = "1.0.66"
tokio-util = { version = "0.6.7", features = ["compat"] }

View File

@ -0,0 +1,83 @@
use async_graphql::http::MultipartOptions;
use poem::http::{header, Method};
use poem::web::Query;
use poem::{async_trait, Error, FromRequest, Request, RequestBody, Result};
use tokio_util::compat::TokioAsyncReadCompatExt;
/// An extractor for GraphQL request.
///
/// You can just use the extractor as in the example below, but I would recommend using
/// the [`GraphQL`](crate::GraphQL) endpoint because it is easier to integrate.
///
/// # Example
///
/// ```
/// use poem::{handler, route, EndpointExt};
/// use poem::web::{Json, Data};
/// use poem::middleware::AddData;
/// use async_graphql_poem::GraphQLRequest;
/// use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema};
///
/// struct Query;
///
/// #[Object]
/// impl Query {
/// async fn value(&self) -> i32 {
/// 100
/// }
/// }
///
/// type MySchema = Schema<Query, EmptyMutation, EmptySubscription>;
///
/// #[handler(method = "get")]
/// async fn index(req: GraphQLRequest, schema: Data<&MySchema>) -> Json<async_graphql::Response> {
/// Json(schema.execute(req.0).await)
/// }
///
/// let app = route().at("/", index.with(AddData::new(Schema::new(Query, EmptyMutation, EmptySubscription))));
/// ```
pub struct GraphQLRequest(pub async_graphql::Request);
#[async_trait]
impl<'a> FromRequest<'a> for GraphQLRequest {
async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result<Self> {
Ok(GraphQLRequest(
GraphQLBatchRequest::from_request(req, body)
.await?
.0
.into_single()
.map_err(Error::bad_request)?,
))
}
}
/// An extractor for GraphQL batch request.
pub struct GraphQLBatchRequest(pub async_graphql::BatchRequest);
#[async_trait]
impl<'a> FromRequest<'a> for GraphQLBatchRequest {
async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result<Self> {
if req.method() == Method::GET {
let req = Query::from_request(req, body)
.await
.map_err(Error::bad_request)?
.0;
Ok(Self(async_graphql::BatchRequest::Single(req)))
} else {
let content_type = req
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(ToString::to_string);
Ok(Self(
async_graphql::http::receive_batch_body(
content_type,
body.take()?.into_async_read().compat(),
MultipartOptions::default(),
)
.await
.map_err(Error::bad_request)?,
))
}
}
}

View File

@ -0,0 +1,11 @@
//! Async-graphql integration with Poem
#![forbid(unsafe_code)]
#![warn(missing_docs)]
mod extractor;
mod query;
mod subscription;
pub use extractor::{GraphQLBatchRequest, GraphQLRequest};
pub use query::GraphQL;
pub use subscription::GraphQLSubscription;

View File

@ -0,0 +1,54 @@
use async_graphql::{BatchResponse as GraphQLBatchResponse, ObjectType, Schema, SubscriptionType};
use poem::web::Json;
use poem::{async_trait, Endpoint, FromRequest, Request, Result};
use crate::GraphQLBatchRequest;
/// A GraphQL query endpoint.
///
/// # Example
///
/// ```
/// use poem::route;
/// use async_graphql_poem::GraphQL;
/// use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema};
///
/// struct Query;
///
/// #[Object]
/// impl Query {
/// async fn value(&self) -> i32 {
/// 100
/// }
/// }
///
/// type MySchema = Schema<Query, EmptyMutation, EmptySubscription>;
///
/// let app = route().at("/", GraphQL::new(Schema::new(Query, EmptyMutation, EmptySubscription)));
/// ```
pub struct GraphQL<Query, Mutation, Subscription> {
schema: Schema<Query, Mutation, Subscription>,
}
impl<Query, Mutation, Subscription> GraphQL<Query, Mutation, Subscription> {
/// Create a GraphQL query endpoint.
pub fn new(schema: Schema<Query, Mutation, Subscription>) -> Self {
Self { schema }
}
}
#[async_trait]
impl<Query, Mutation, Subscription> Endpoint for GraphQL<Query, Mutation, Subscription>
where
Query: ObjectType + 'static,
Mutation: ObjectType + 'static,
Subscription: SubscriptionType + 'static,
{
type Output = Result<Json<GraphQLBatchResponse>>;
async fn call(&self, req: Request) -> Self::Output {
let (req, mut body) = req.split();
let req = GraphQLBatchRequest::from_request(&req, &mut body).await?;
Ok(Json(self.schema.execute_batch(req.0).await))
}
}

View File

@ -0,0 +1,146 @@
use std::str::FromStr;
use async_graphql::http::{WebSocketProtocols, WsMessage, ALL_WEBSOCKET_PROTOCOLS};
use async_graphql::{Data, ObjectType, Schema, SubscriptionType};
use futures_util::future::{self, Ready};
use futures_util::{Future, SinkExt, StreamExt};
use poem::web::websocket::{Message, WebSocket};
use poem::{http, Endpoint, FromRequest, IntoResponse, Request, Response, Result};
/// A GraphQL subscription endpoint.
///
/// # Example
///
/// ```
/// use poem::route;
/// use async_graphql_poem::GraphQLSubscription;
/// use async_graphql::{EmptyMutation, Object, Schema, Subscription};
/// use futures_util::{Stream, stream};
///
/// struct Query;
///
/// #[Object]
/// impl Query {
/// async fn value(&self) -> i32 {
/// 100
/// }
/// }
///
/// struct Subscription;
///
/// #[Subscription]
/// impl Subscription {
/// async fn values(&self) -> impl Stream<Item = i32> {
/// stream::iter(vec![1, 2, 3, 4, 5])
/// }
/// }
///
/// type MySchema = Schema<Query, EmptyMutation, Subscription>;
///
/// let app = route().at("/ws", GraphQLSubscription::new(Schema::new(Query, EmptyMutation, Subscription)));
/// ```
pub struct GraphQLSubscription<Query, Mutation, Subscription, F> {
schema: Schema<Query, Mutation, Subscription>,
initializer: F,
}
impl<Query, Mutation, Subscription>
GraphQLSubscription<
Query,
Mutation,
Subscription,
fn(serde_json::Value) -> Ready<async_graphql::Result<Data>>,
>
{
/// Create a GraphQL subscription endpoint.
pub fn new(schema: Schema<Query, Mutation, Subscription>) -> Self {
Self {
schema,
initializer: |_| futures_util::future::ready(Ok(Default::default())),
}
}
}
impl<Query, Mutation, Subscription, F> GraphQLSubscription<Query, Mutation, Subscription, F> {
/// With a data initialization function.
pub fn with_initializer<F2, R>(
self,
initializer: F2,
) -> GraphQLSubscription<Query, Mutation, Subscription, F2>
where
F2: FnOnce(serde_json::Value) -> R + Clone + Send + Sync + 'static,
R: Future<Output = Result<Data>> + Send + 'static,
{
GraphQLSubscription {
schema: self.schema,
initializer,
}
}
}
#[poem::async_trait]
impl<Query, Mutation, Subscription, F, R> Endpoint
for GraphQLSubscription<Query, Mutation, Subscription, F>
where
Query: ObjectType + 'static,
Mutation: ObjectType + 'static,
Subscription: SubscriptionType + 'static,
F: FnOnce(serde_json::Value) -> R + Clone + Send + Sync + 'static,
R: Future<Output = async_graphql::Result<Data>> + Send + 'static,
{
type Output = Result<Response>;
async fn call(&self, req: Request) -> Self::Output {
let (req, mut body) = req.split();
let websocket = WebSocket::from_request(&req, &mut body).await?;
let protocol = req
.headers()
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
.and_then(|value| value.to_str().ok())
.and_then(|protocols| {
protocols
.split(',')
.find_map(|p| WebSocketProtocols::from_str(p.trim()).ok())
})
.unwrap_or(WebSocketProtocols::SubscriptionsTransportWS);
let schema = self.schema.clone();
let initializer = self.initializer.clone();
let resp = websocket
.protocols(ALL_WEBSOCKET_PROTOCOLS)
.on_upgrade(move |socket| async move {
let (mut sink, stream) = socket.split();
let stream = stream
.take_while(|res| future::ready(res.is_ok()))
.map(Result::unwrap)
.filter_map(|msg| {
if msg.is_text() || msg.is_binary() {
future::ready(Some(msg))
} else {
future::ready(None)
}
})
.map(Message::into_bytes)
.boxed();
let mut stream = async_graphql::http::WebSocket::with_data(
schema,
stream,
initializer,
protocol,
)
.map(|msg| match msg {
WsMessage::Text(text) => Message::text(text),
WsMessage::Close(code, status) => Message::close_with(code, status),
});
while let Some(item) = stream.next().await {
let _ = sink.send(item).await;
}
})
.into_response();
Ok(resp)
}
}

View File

@ -43,7 +43,7 @@
//! * Rustfmt friendly (Procedural Macro)
//! * Custom scalars
//! * Minimal overhead
//! * Easy integration (actix_web, tide, warp, rocket ...)
//! * Easy integration ([poem](https://crates.io/crates/poem), actix_web, tide, warp, rocket ...)
//! * File upload (Multipart request)
//! * Subscriptions (WebSocket transport)
//! * Custom extensions