Update async-graphql-poem 2
This commit is contained in:
parent
098255cce7
commit
6905c561ca
|
@ -33,9 +33,6 @@ jobs:
|
|||
- name: async-graphql-warp
|
||||
registryName: async-graphql-warp
|
||||
path: integrations/warp
|
||||
- name: async-graphql-tide
|
||||
registryName: async-graphql-tide
|
||||
path: integrations/tide
|
||||
- name: async-graphql-poem
|
||||
registryName: async-graphql-poem
|
||||
path: integrations/poem
|
||||
|
|
|
@ -86,7 +86,6 @@ members = [
|
|||
"integrations/poem",
|
||||
"integrations/actix-web",
|
||||
"integrations/rocket",
|
||||
"integrations/tide",
|
||||
"integrations/warp",
|
||||
"integrations/axum",
|
||||
]
|
||||
|
|
|
@ -43,5 +43,6 @@ they must all internally use the below functions.
|
|||
- **Poem**: Complete integration.
|
||||
- **Actix-web**: Complete integration.
|
||||
- **Rocket**: Missing websocket support (blocked on [support in Rocket itself](https://github.com/SergioBenitez/Rocket/issues/90)).
|
||||
- **Tide**: Complete integration.
|
||||
- **Warp**: Complete integration.
|
||||
- **Axum**: Complete integration.
|
||||
-
|
|
@ -10,4 +10,4 @@ mod subscription;
|
|||
pub use extractor::{GraphQLBatchRequest, GraphQLRequest};
|
||||
pub use query::GraphQL;
|
||||
pub use response::{GraphQLBatchResponse, GraphQLResponse};
|
||||
pub use subscription::GraphQLSubscription;
|
||||
pub use subscription::{GraphQLProtocol, GraphQLSubscription, GraphQLWebSocket};
|
||||
|
|
|
@ -5,8 +5,34 @@ use async_graphql::{Data, ObjectType, Schema, SubscriptionType};
|
|||
use futures_util::future::{self, Ready};
|
||||
use futures_util::{Future, SinkExt, StreamExt};
|
||||
use poem::http::StatusCode;
|
||||
use poem::web::websocket::{Message, WebSocket};
|
||||
use poem::{http, Endpoint, Error, FromRequest, IntoResponse, Request, Response, Result};
|
||||
use poem::web::websocket::{Message, WebSocket, WebSocketStream};
|
||||
use poem::{
|
||||
http, Endpoint, Error, FromRequest, IntoResponse, Request, RequestBody, Response, Result,
|
||||
};
|
||||
|
||||
/// A GraphQL protocol extractor.
|
||||
///
|
||||
/// It extract GraphQL protocol from `SEC_WEBSOCKET_PROTOCOL` header.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub struct GraphQLProtocol(WebSocketProtocols);
|
||||
|
||||
#[poem::async_trait]
|
||||
impl<'a> FromRequest<'a> for GraphQLProtocol {
|
||||
type Error = Error;
|
||||
|
||||
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> Result<Self, Self::Error> {
|
||||
req.headers()
|
||||
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|protocols| {
|
||||
protocols
|
||||
.split(',')
|
||||
.find_map(|p| WebSocketProtocols::from_str(p.trim()).ok())
|
||||
})
|
||||
.map(Self)
|
||||
.ok_or_else(|| Error::new(StatusCode::BAD_REQUEST))
|
||||
}
|
||||
}
|
||||
|
||||
/// A GraphQL subscription endpoint.
|
||||
///
|
||||
|
@ -41,149 +67,143 @@ use poem::{http, Endpoint, Error, FromRequest, IntoResponse, Request, Response,
|
|||
/// let schema = Schema::new(Query, EmptyMutation, Subscription);
|
||||
/// let app = Route::new().at("/ws", get(GraphQLSubscription::new(schema)));
|
||||
/// ```
|
||||
pub struct GraphQLSubscription<Query, Mutation, Subscription, OnCreate, OnInit> {
|
||||
pub struct GraphQLSubscription<Query, Mutation, Subscription> {
|
||||
schema: Schema<Query, Mutation, Subscription>,
|
||||
on_connection_create: OnCreate,
|
||||
on_connection_init: OnInit,
|
||||
}
|
||||
|
||||
type DefaultOnConnCreateType = fn(&Request) -> Ready<Result<Data>>;
|
||||
|
||||
fn default_on_connection_create(_: &Request) -> Ready<Result<Data>> {
|
||||
futures_util::future::ready(Ok(Data::default()))
|
||||
}
|
||||
|
||||
type DefaultOnConnInitType = fn(serde_json::Value) -> Ready<Result<Data>>;
|
||||
|
||||
fn default_on_connection_init(_: serde_json::Value) -> Ready<Result<Data>> {
|
||||
futures_util::future::ready(Ok(Data::default()))
|
||||
}
|
||||
|
||||
impl<Query, Mutation, Subscription>
|
||||
GraphQLSubscription<
|
||||
Query,
|
||||
Mutation,
|
||||
Subscription,
|
||||
DefaultOnConnCreateType,
|
||||
DefaultOnConnInitType,
|
||||
>
|
||||
{
|
||||
impl<Query, Mutation, Subscription> GraphQLSubscription<Query, Mutation, Subscription> {
|
||||
/// Create a GraphQL subscription endpoint.
|
||||
pub fn new(schema: Schema<Query, Mutation, Subscription>) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
on_connection_create: default_on_connection_create,
|
||||
on_connection_init: default_on_connection_init,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Query, Mutation, Subscription, OnCreate, OnInit>
|
||||
GraphQLSubscription<Query, Mutation, Subscription, OnCreate, OnInit>
|
||||
{
|
||||
/// Specify the callback function to be called when the connection is created.
|
||||
///
|
||||
/// You can get something from the incoming request to create [`Data`].
|
||||
pub fn on_connection_create<OnCreate2, Fut>(
|
||||
self,
|
||||
callback: OnCreate2,
|
||||
) -> GraphQLSubscription<Query, Mutation, Subscription, OnCreate2, OnInit>
|
||||
where
|
||||
OnCreate2: Fn(&Request) -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<Data>> + Send + 'static,
|
||||
{
|
||||
GraphQLSubscription {
|
||||
schema: self.schema,
|
||||
on_connection_create: callback,
|
||||
on_connection_init: self.on_connection_init,
|
||||
}
|
||||
}
|
||||
|
||||
/// Specify a callback function to be called when the connection is initialized.
|
||||
///
|
||||
/// You can get something from the payload of [`GQL_CONNECTION_INIT` message](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_connection_init) to create [`Data`].
|
||||
pub fn on_connection_init<OnInit2, Fut>(
|
||||
self,
|
||||
callback: OnInit2,
|
||||
) -> GraphQLSubscription<Query, Mutation, Subscription, OnCreate, OnInit2>
|
||||
where
|
||||
OnInit2: FnOnce(serde_json::Value) -> Fut + Clone + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<Data>> + Send + 'static,
|
||||
{
|
||||
GraphQLSubscription {
|
||||
schema: self.schema,
|
||||
on_connection_create: self.on_connection_create,
|
||||
on_connection_init: callback,
|
||||
}
|
||||
Self { schema }
|
||||
}
|
||||
}
|
||||
|
||||
#[poem::async_trait]
|
||||
impl<Query, Mutation, Subscription, OnCreate, OnCreateFut, OnInit, OnInitFut> Endpoint
|
||||
for GraphQLSubscription<Query, Mutation, Subscription, OnCreate, OnInit>
|
||||
impl<Query, Mutation, Subscription> Endpoint for GraphQLSubscription<Query, Mutation, Subscription>
|
||||
where
|
||||
Query: ObjectType + 'static,
|
||||
Mutation: ObjectType + 'static,
|
||||
Subscription: SubscriptionType + 'static,
|
||||
OnCreate: Fn(&Request) -> OnCreateFut + Send + Sync + 'static,
|
||||
OnCreateFut: Future<Output = async_graphql::Result<Data>> + Send + 'static,
|
||||
OnInit: FnOnce(serde_json::Value) -> OnInitFut + Clone + Send + Sync + 'static,
|
||||
OnInitFut: Future<Output = async_graphql::Result<Data>> + Send + 'static,
|
||||
{
|
||||
type Output = Result<Response>;
|
||||
|
||||
async fn call(&self, req: Request) -> Self::Output {
|
||||
let data = (self.on_connection_create)(&req)
|
||||
.await
|
||||
.map_err(|_| Error::new(StatusCode::BAD_REQUEST))?;
|
||||
|
||||
let (req, mut body) = req.split();
|
||||
let websocket = WebSocket::from_request(&req, &mut body).await?;
|
||||
let protocol = req
|
||||
.headers()
|
||||
.get(http::header::SEC_WEBSOCKET_PROTOCOL)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|protocols| {
|
||||
protocols
|
||||
.split(',')
|
||||
.find_map(|p| WebSocketProtocols::from_str(p.trim()).ok())
|
||||
})
|
||||
.unwrap_or(WebSocketProtocols::SubscriptionsTransportWS);
|
||||
let protocol = GraphQLProtocol::from_request(&req, &mut body).await?;
|
||||
let schema = self.schema.clone();
|
||||
let on_connection_init = self.on_connection_init.clone();
|
||||
|
||||
let resp = websocket
|
||||
.protocols(ALL_WEBSOCKET_PROTOCOLS)
|
||||
.on_upgrade(move |socket| async move {
|
||||
let (mut sink, stream) = socket.split();
|
||||
|
||||
let stream = stream
|
||||
.take_while(|res| future::ready(res.is_ok()))
|
||||
.map(Result::unwrap)
|
||||
.filter_map(|msg| {
|
||||
if msg.is_text() || msg.is_binary() {
|
||||
future::ready(Some(msg))
|
||||
} else {
|
||||
future::ready(None)
|
||||
}
|
||||
})
|
||||
.map(Message::into_bytes)
|
||||
.boxed();
|
||||
|
||||
let mut stream = async_graphql::http::WebSocket::new(schema, stream, protocol)
|
||||
.connection_data(data)
|
||||
.on_connection_init(on_connection_init)
|
||||
.map(|msg| match msg {
|
||||
WsMessage::Text(text) => Message::text(text),
|
||||
WsMessage::Close(code, status) => Message::close_with(code, status),
|
||||
});
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
let _ = sink.send(item).await;
|
||||
}
|
||||
})
|
||||
.on_upgrade(move |stream| GraphQLWebSocket::new(stream, schema, protocol).serve())
|
||||
.into_response();
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
}
|
||||
|
||||
type DefaultOnConnInitType = fn(serde_json::Value) -> Ready<async_graphql::Result<Data>>;
|
||||
|
||||
fn default_on_connection_init(_: serde_json::Value) -> Ready<async_graphql::Result<Data>> {
|
||||
futures_util::future::ready(Ok(Data::default()))
|
||||
}
|
||||
|
||||
/// A Websocket connection for GraphQL subscription.
|
||||
pub struct GraphQLWebSocket<Query, Mutation, Subscription, OnConnInit> {
|
||||
schema: Schema<Query, Mutation, Subscription>,
|
||||
stream: WebSocketStream,
|
||||
data: Data,
|
||||
on_connection_init: OnConnInit,
|
||||
protocol: GraphQLProtocol,
|
||||
}
|
||||
|
||||
impl<Query, Mutation, Subscription>
|
||||
GraphQLWebSocket<Query, Mutation, Subscription, DefaultOnConnInitType>
|
||||
where
|
||||
Query: ObjectType + 'static,
|
||||
Mutation: ObjectType + 'static,
|
||||
Subscription: SubscriptionType + 'static,
|
||||
{
|
||||
/// Create a [`GraphQLWebSocket`] object.
|
||||
pub fn new(
|
||||
stream: WebSocketStream,
|
||||
schema: Schema<Query, Mutation, Subscription>,
|
||||
protocol: GraphQLProtocol,
|
||||
) -> Self {
|
||||
GraphQLWebSocket {
|
||||
schema,
|
||||
stream,
|
||||
data: Data::default(),
|
||||
on_connection_init: default_on_connection_init,
|
||||
protocol,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Query, Mutation, Subscription, OnConnInit, OnConnInitFut>
|
||||
GraphQLWebSocket<Query, Mutation, Subscription, OnConnInit>
|
||||
where
|
||||
Query: ObjectType + 'static,
|
||||
Mutation: ObjectType + 'static,
|
||||
Subscription: SubscriptionType + 'static,
|
||||
OnConnInit: Fn(serde_json::Value) -> OnConnInitFut + Send + Sync + 'static,
|
||||
OnConnInitFut: Future<Output = async_graphql::Result<Data>> + Send + 'static,
|
||||
{
|
||||
/// Specify the initial subscription context data, usually you can get something from the
|
||||
/// incoming request to create it.
|
||||
pub fn with_data(self, data: Data) -> Self {
|
||||
Self { data, ..self }
|
||||
}
|
||||
|
||||
/// Specify a callback function to be called when the connection is initialized.
|
||||
///
|
||||
/// You can get something from the payload of [`GQL_CONNECTION_INIT` message](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_connection_init) to create [`Data`].
|
||||
/// The data returned by this callback function will be merged with the data specified by [`with_data`].
|
||||
pub fn on_connection_init<OnConnInit2, Fut>(
|
||||
self,
|
||||
callback: OnConnInit2,
|
||||
) -> GraphQLWebSocket<Query, Mutation, Subscription, OnConnInit2>
|
||||
where
|
||||
OnConnInit2: Fn(serde_json::Value) -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = async_graphql::Result<Data>> + Send + 'static,
|
||||
{
|
||||
GraphQLWebSocket {
|
||||
schema: self.schema,
|
||||
stream: self.stream,
|
||||
data: self.data,
|
||||
on_connection_init: callback,
|
||||
protocol: self.protocol,
|
||||
}
|
||||
}
|
||||
|
||||
/// Processing subscription requests.
|
||||
pub async fn serve(self) {
|
||||
let (mut sink, stream) = self.stream.split();
|
||||
|
||||
let stream = stream
|
||||
.take_while(|res| future::ready(res.is_ok()))
|
||||
.map(Result::unwrap)
|
||||
.filter_map(|msg| {
|
||||
if msg.is_text() || msg.is_binary() {
|
||||
future::ready(Some(msg))
|
||||
} else {
|
||||
future::ready(None)
|
||||
}
|
||||
})
|
||||
.map(Message::into_bytes)
|
||||
.boxed();
|
||||
|
||||
let mut stream =
|
||||
async_graphql::http::WebSocket::new(self.schema.clone(), stream, self.protocol.0)
|
||||
.connection_data(self.data)
|
||||
.on_connection_init(self.on_connection_init)
|
||||
.map(|msg| match msg {
|
||||
WsMessage::Text(text) => Message::text(text),
|
||||
WsMessage::Close(code, status) => Message::close_with(code, status),
|
||||
});
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
let _ = sink.send(item).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
[package]
|
||||
name = "async-graphql-tide"
|
||||
version = "2.11.2"
|
||||
authors = ["vkill <vkill.net@gmail.com>"]
|
||||
edition = "2021"
|
||||
description = "async-graphql for tide"
|
||||
license = "MIT/Apache-2.0"
|
||||
documentation = "https://docs.rs/async-graphql-tide/"
|
||||
homepage = "https://github.com/async-graphql/async-graphql"
|
||||
repository = "https://github.com/async-graphql/async-graphql"
|
||||
keywords = ["futures", "async", "graphql"]
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[features]
|
||||
default = ["websocket"]
|
||||
websocket = ["tide-websockets"]
|
||||
|
||||
[dependencies]
|
||||
async-graphql = { path = "../..", version = "=2.11.2" }
|
||||
async-trait = "0.1.48"
|
||||
futures-util = "0.3.13"
|
||||
serde_json = "1.0.64"
|
||||
|
||||
tide = { version = "0.16.0", default-features = false, features = ["h1-server"] }
|
||||
tide-websockets = { version = "0.4.0", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
# Surf lacks multipart support
|
||||
reqwest = { version = "0.11.2", default-features = false, features = ["json", "multipart"] }
|
||||
async-std = { version = "1.9.0", features = ["attributes", "tokio1"] }
|
||||
serde_json = "1.0.64"
|
|
@ -1,175 +0,0 @@
|
|||
//! Async-graphql integration with Tide
|
||||
//!
|
||||
//! # Examples
|
||||
//! *[Full Example](<https://github.com/async-graphql/examples/blob/master/tide/starwars/src/main.rs>)*
|
||||
|
||||
#![warn(missing_docs)]
|
||||
#![allow(clippy::type_complexity)]
|
||||
#![allow(clippy::needless_doctest_main)]
|
||||
#![forbid(unsafe_code)]
|
||||
|
||||
#[cfg(feature = "websocket")]
|
||||
mod subscription;
|
||||
|
||||
use async_graphql::http::MultipartOptions;
|
||||
use async_graphql::{ObjectType, ParseRequestError, Schema, SubscriptionType};
|
||||
use tide::utils::async_trait;
|
||||
use tide::{
|
||||
http::{
|
||||
headers::{self, HeaderValue},
|
||||
Method,
|
||||
},
|
||||
Body, Request, Response, StatusCode,
|
||||
};
|
||||
|
||||
#[cfg(feature = "websocket")]
|
||||
pub use subscription::GraphQLSubscriptionBuilder;
|
||||
|
||||
/// An endpoint for GraphQL.
|
||||
///
|
||||
/// This is created with the [`endpoint`](fn.endpoint.html) function.
|
||||
pub struct GraphQLEndpoint<Query, Mutation, Subscription> {
|
||||
/// The schema of the endpoint.
|
||||
schema: Schema<Query, Mutation, Subscription>,
|
||||
|
||||
/// The multipart options of the endpoint.
|
||||
opts: MultipartOptions,
|
||||
|
||||
/// Whether to support batch requests in the endpoint.
|
||||
batch: bool,
|
||||
}
|
||||
|
||||
// Manual impl to remove bounds on generics
|
||||
impl<Query, Mutation, Subscription> Clone for GraphQLEndpoint<Query, Mutation, Subscription> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
schema: self.schema.clone(),
|
||||
opts: self.opts,
|
||||
batch: self.batch,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Query, Mutation, Subscription> GraphQLEndpoint<Query, Mutation, Subscription> {
|
||||
/// Create a new GraphQL endpoint with the schema.
|
||||
pub fn new(schema: Schema<Query, Mutation, Subscription>) -> Self {
|
||||
GraphQLEndpoint {
|
||||
schema,
|
||||
opts: MultipartOptions::default(),
|
||||
batch: true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the multipart options of the endpoint.
|
||||
#[must_use]
|
||||
pub fn multipart_opts(self, opts: MultipartOptions) -> Self {
|
||||
Self { opts, ..self }
|
||||
}
|
||||
|
||||
/// Set whether batch requests are supported in the endpoint.
|
||||
///
|
||||
/// Default is `true`.
|
||||
#[must_use]
|
||||
pub fn batch(self, batch: bool) -> Self {
|
||||
Self { batch, ..self }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<Query, Mutation, Subscription, TideState> tide::Endpoint<TideState>
|
||||
for GraphQLEndpoint<Query, Mutation, Subscription>
|
||||
where
|
||||
Query: ObjectType + 'static,
|
||||
Mutation: ObjectType + 'static,
|
||||
Subscription: SubscriptionType + 'static,
|
||||
TideState: Clone + Send + Sync + 'static,
|
||||
{
|
||||
async fn call(&self, request: Request<TideState>) -> tide::Result {
|
||||
respond(
|
||||
self.schema
|
||||
.execute_batch(if self.batch {
|
||||
receive_batch_request_opts(request, self.opts).await
|
||||
} else {
|
||||
receive_request_opts(request, self.opts)
|
||||
.await
|
||||
.map(Into::into)
|
||||
}?)
|
||||
.await,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a Tide request to a GraphQL request.
|
||||
pub async fn receive_request<State: Clone + Send + Sync + 'static>(
|
||||
request: Request<State>,
|
||||
) -> tide::Result<async_graphql::Request> {
|
||||
receive_request_opts(request, Default::default()).await
|
||||
}
|
||||
|
||||
/// Convert a Tide request to a GraphQL request with options on how to receive multipart.
|
||||
pub async fn receive_request_opts<State: Clone + Send + Sync + 'static>(
|
||||
request: Request<State>,
|
||||
opts: MultipartOptions,
|
||||
) -> tide::Result<async_graphql::Request> {
|
||||
receive_batch_request_opts(request, opts)
|
||||
.await?
|
||||
.into_single()
|
||||
.map_err(|e| tide::Error::new(StatusCode::BadRequest, e))
|
||||
}
|
||||
|
||||
/// Convert a Tide request to a GraphQL batch request.
|
||||
pub async fn receive_batch_request<State: Clone + Send + Sync + 'static>(
|
||||
request: Request<State>,
|
||||
) -> tide::Result<async_graphql::BatchRequest> {
|
||||
receive_batch_request_opts(request, Default::default()).await
|
||||
}
|
||||
|
||||
/// Convert a Tide request to a GraphQL batch request with options on how to receive multipart.
|
||||
pub async fn receive_batch_request_opts<State: Clone + Send + Sync + 'static>(
|
||||
mut request: Request<State>,
|
||||
opts: MultipartOptions,
|
||||
) -> tide::Result<async_graphql::BatchRequest> {
|
||||
if request.method() == Method::Get {
|
||||
request.query::<async_graphql::Request>().map(Into::into)
|
||||
} else if request.method() == Method::Post {
|
||||
let body = request.take_body();
|
||||
let content_type = request
|
||||
.header(headers::CONTENT_TYPE)
|
||||
.and_then(|values| values.get(0))
|
||||
.map(HeaderValue::as_str);
|
||||
|
||||
async_graphql::http::receive_batch_body(content_type, body, opts)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tide::Error::new(
|
||||
match &e {
|
||||
ParseRequestError::PayloadTooLarge => StatusCode::PayloadTooLarge,
|
||||
_ => StatusCode::BadRequest,
|
||||
},
|
||||
e,
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Err(tide::Error::from_str(
|
||||
StatusCode::MethodNotAllowed,
|
||||
"GraphQL only supports GET and POST requests",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a GraphQL response to a Tide response.
|
||||
pub fn respond(resp: impl Into<async_graphql::BatchResponse>) -> tide::Result {
|
||||
let resp = resp.into();
|
||||
|
||||
let mut response = Response::new(StatusCode::Ok);
|
||||
if resp.is_ok() {
|
||||
if let Some(cache_control) = resp.cache_control().value() {
|
||||
response.insert_header(headers::CACHE_CONTROL, cache_control);
|
||||
}
|
||||
}
|
||||
for (name, value) in resp.http_headers() {
|
||||
response.append_header(name, value);
|
||||
}
|
||||
response.set_body(Body::from_json(&resp)?);
|
||||
Ok(response)
|
||||
}
|
|
@ -1,166 +0,0 @@
|
|||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::str::FromStr;
|
||||
|
||||
use async_graphql::http::{WebSocket as AGWebSocket, WebSocketProtocols, WsMessage};
|
||||
use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType};
|
||||
use futures_util::future::Ready;
|
||||
use futures_util::{future, StreamExt};
|
||||
use tide::{Endpoint, Request, StatusCode};
|
||||
use tide_websockets::Message;
|
||||
|
||||
type DefaultOnConnCreateType<S> = fn(&Request<S>) -> Ready<Result<Data>>;
|
||||
|
||||
fn default_on_connection_create<S>(_: &Request<S>) -> Ready<Result<Data>> {
|
||||
futures_util::future::ready(Ok(Data::default()))
|
||||
}
|
||||
|
||||
type DefaultOnConnInitType = fn(serde_json::Value) -> Ready<Result<Data>>;
|
||||
|
||||
fn default_on_connection_init(_: serde_json::Value) -> Ready<Result<Data>> {
|
||||
futures_util::future::ready(Ok(Data::default()))
|
||||
}
|
||||
|
||||
/// GraphQL subscription builder.
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
|
||||
pub struct GraphQLSubscriptionBuilder<TideState, Query, Mutation, Subscription, OnCreate, OnInit> {
|
||||
schema: Schema<Query, Mutation, Subscription>,
|
||||
on_connection_create: OnCreate,
|
||||
on_connection_init: OnInit,
|
||||
_mark: PhantomData<TideState>,
|
||||
}
|
||||
|
||||
impl<TideState, Query, Mutation, Subscription>
|
||||
GraphQLSubscriptionBuilder<
|
||||
TideState,
|
||||
Query,
|
||||
Mutation,
|
||||
Subscription,
|
||||
DefaultOnConnCreateType<TideState>,
|
||||
DefaultOnConnInitType,
|
||||
>
|
||||
{
|
||||
/// Create a GraphQL subscription builder.
|
||||
pub fn new(schema: Schema<Query, Mutation, Subscription>) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
on_connection_create: default_on_connection_create,
|
||||
on_connection_init: default_on_connection_init,
|
||||
_mark: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Query, Mutation, Subscription, OnCreate, OnInit>
|
||||
GraphQLSubscriptionBuilder<S, Query, Mutation, Subscription, OnCreate, OnInit>
|
||||
{
|
||||
/// Specify the callback function to be called when the connection is created.
|
||||
///
|
||||
/// You can get something from the incoming request to create [`Data`].
|
||||
pub fn on_connection_create<OnCreate2, Fut>(
|
||||
self,
|
||||
callback: OnCreate2,
|
||||
) -> GraphQLSubscriptionBuilder<S, Query, Mutation, Subscription, OnCreate2, OnInit>
|
||||
where
|
||||
OnCreate2: Fn(&Request<S>) -> Fut + Clone + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<Data>> + Send + 'static,
|
||||
{
|
||||
GraphQLSubscriptionBuilder {
|
||||
schema: self.schema,
|
||||
on_connection_create: callback,
|
||||
on_connection_init: self.on_connection_init,
|
||||
_mark: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Specify a callback function to be called when the connection is initialized.
|
||||
///
|
||||
/// You can get something from the payload of [`GQL_CONNECTION_INIT` message](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_connection_init) to create [`Data`].
|
||||
pub fn on_connection_init<OnInit2, Fut>(
|
||||
self,
|
||||
callback: OnInit2,
|
||||
) -> GraphQLSubscriptionBuilder<S, Query, Mutation, Subscription, OnCreate, OnInit2>
|
||||
where
|
||||
OnInit2: FnOnce(serde_json::Value) -> Fut + Clone + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<Data>> + Send + 'static,
|
||||
{
|
||||
GraphQLSubscriptionBuilder {
|
||||
schema: self.schema,
|
||||
on_connection_create: self.on_connection_create,
|
||||
on_connection_init: callback,
|
||||
_mark: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TideState, Query, Mutation, Subscription, OnCreate, OnCreateFut, OnInit, OnInitFut>
|
||||
GraphQLSubscriptionBuilder<TideState, Query, Mutation, Subscription, OnCreate, OnInit>
|
||||
where
|
||||
TideState: Send + Sync + Clone + 'static,
|
||||
Query: ObjectType + 'static,
|
||||
Mutation: ObjectType + 'static,
|
||||
Subscription: SubscriptionType + 'static,
|
||||
OnCreate: Fn(&Request<TideState>) -> OnCreateFut + Send + Clone + Sync + 'static,
|
||||
OnCreateFut: Future<Output = async_graphql::Result<Data>> + Send + 'static,
|
||||
OnInit: FnOnce(serde_json::Value) -> OnInitFut + Clone + Send + Sync + 'static,
|
||||
OnInitFut: Future<Output = async_graphql::Result<Data>> + Send + 'static,
|
||||
{
|
||||
/// Create an endpoint for graphql subscription.
|
||||
pub fn build(self) -> impl Endpoint<TideState> {
|
||||
tide_websockets::WebSocket::<TideState, _>::new(move |request, connection| {
|
||||
let schema = self.schema.clone();
|
||||
let on_connection_create = self.on_connection_create.clone();
|
||||
let on_connection_init = self.on_connection_init.clone();
|
||||
|
||||
async move {
|
||||
let data = on_connection_create(&request)
|
||||
.await
|
||||
.map_err(|_| tide::Error::from_str(StatusCode::BadRequest, "bad request"))?;
|
||||
|
||||
let protocol = match request
|
||||
.header("sec-websocket-protocol")
|
||||
.map(|value| value.as_str())
|
||||
.and_then(|protocols| {
|
||||
protocols
|
||||
.split(',')
|
||||
.find_map(|p| WebSocketProtocols::from_str(p.trim()).ok())
|
||||
}) {
|
||||
Some(protocol) => protocol,
|
||||
None => {
|
||||
// default to the prior standard
|
||||
WebSocketProtocols::SubscriptionsTransportWS
|
||||
}
|
||||
};
|
||||
|
||||
let sink = connection.clone();
|
||||
let mut stream = AGWebSocket::new(
|
||||
schema.clone(),
|
||||
connection
|
||||
.take_while(|msg| future::ready(msg.is_ok()))
|
||||
.map(Result::unwrap)
|
||||
.map(Message::into_data),
|
||||
protocol,
|
||||
)
|
||||
.connection_data(data)
|
||||
.on_connection_init(on_connection_init);
|
||||
|
||||
while let Some(data) = stream.next().await {
|
||||
match data {
|
||||
WsMessage::Text(text) => {
|
||||
if sink.send_string(text).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
WsMessage::Close(_code, _msg) => {
|
||||
// TODO: Send close frame
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.with_protocols(&["graphql-transport-ws", "graphql-ws"])
|
||||
}
|
||||
}
|
|
@ -1,218 +0,0 @@
|
|||
mod test_utils;
|
||||
|
||||
use std::io::Read;
|
||||
|
||||
use async_graphql::*;
|
||||
use async_graphql_tide::GraphQLEndpoint;
|
||||
use reqwest::{header, StatusCode};
|
||||
use serde_json::json;
|
||||
|
||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
|
||||
#[async_std::test]
|
||||
async fn quickstart() -> Result<()> {
|
||||
let listen_addr = "127.0.0.1:8081";
|
||||
|
||||
async_std::task::spawn(async move {
|
||||
struct QueryRoot;
|
||||
#[Object]
|
||||
impl QueryRoot {
|
||||
/// Returns the sum of a and b
|
||||
async fn add(&self, a: i32, b: i32) -> i32 {
|
||||
a + b
|
||||
}
|
||||
}
|
||||
|
||||
let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription).finish();
|
||||
|
||||
let mut app = tide::new();
|
||||
let endpoint = GraphQLEndpoint::new(schema);
|
||||
app.at("/").post(endpoint.clone()).get(endpoint);
|
||||
app.listen(listen_addr).await
|
||||
});
|
||||
|
||||
test_utils::wait_server_ready().await;
|
||||
|
||||
let client = test_utils::client();
|
||||
|
||||
let resp = client
|
||||
.post(&format!("http://{}", listen_addr))
|
||||
.json(&json!({"query":"{ add(a: 10, b: 20) }"}))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let string = resp.text().await?;
|
||||
println!("via post {}", string);
|
||||
|
||||
assert_eq!(string, json!({"data": {"add": 30}}).to_string());
|
||||
|
||||
let resp = client
|
||||
.get(&format!("http://{}", listen_addr))
|
||||
.query(&[("query", "{ add(a: 10, b: 20) }")])
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let string = resp.text().await?;
|
||||
println!("via get {}", string);
|
||||
|
||||
assert_eq!(string, json!({"data": {"add": 30}}).to_string());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn hello() -> Result<()> {
|
||||
let listen_addr = "127.0.0.1:8082";
|
||||
|
||||
async_std::task::spawn(async move {
|
||||
struct Hello(String);
|
||||
struct QueryRoot;
|
||||
#[Object]
|
||||
impl QueryRoot {
|
||||
/// Returns hello
|
||||
async fn hello<'a>(&self, ctx: &'a Context<'_>) -> String {
|
||||
let name = ctx.data_opt::<Hello>().map(|hello| hello.0.as_str());
|
||||
format!("Hello, {}!", name.unwrap_or("world"))
|
||||
}
|
||||
}
|
||||
|
||||
let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription).finish();
|
||||
|
||||
let mut app = tide::new();
|
||||
|
||||
app.at("/").post(move |req: tide::Request<()>| {
|
||||
let schema = schema.clone();
|
||||
async move {
|
||||
let name = req
|
||||
.header("name")
|
||||
.and_then(|values| values.get(0))
|
||||
.map(ToString::to_string);
|
||||
let mut req = async_graphql_tide::receive_request(req).await?;
|
||||
if let Some(name) = name {
|
||||
req = req.data(Hello(name));
|
||||
}
|
||||
async_graphql_tide::respond(schema.execute(req).await)
|
||||
}
|
||||
});
|
||||
app.listen(listen_addr).await
|
||||
});
|
||||
|
||||
test_utils::wait_server_ready().await;
|
||||
|
||||
let client = test_utils::client();
|
||||
|
||||
let resp = client
|
||||
.post(&format!("http://{}", listen_addr))
|
||||
.json(&json!({"query":"{ hello }"}))
|
||||
.header("Name", "Foo")
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let string = resp.text().await?;
|
||||
println!("{}", string);
|
||||
|
||||
assert_eq!(string, json!({"data":{"hello":"Hello, Foo!"}}).to_string());
|
||||
|
||||
let resp = client
|
||||
.post(&format!("http://{}", listen_addr))
|
||||
.json(&json!({"query":"{ hello }"}))
|
||||
.header(header::CONTENT_TYPE, "application/json")
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let string = resp.text().await?;
|
||||
println!("{}", string);
|
||||
|
||||
assert_eq!(
|
||||
string,
|
||||
json!({"data":{"hello":"Hello, world!"}}).to_string()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn upload() -> Result<()> {
|
||||
let listen_addr = "127.0.0.1:8083";
|
||||
|
||||
async_std::task::spawn(async move {
|
||||
struct QueryRoot;
|
||||
|
||||
#[Object]
|
||||
impl QueryRoot {
|
||||
async fn value(&self) -> i32 {
|
||||
10
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, SimpleObject)]
|
||||
pub struct FileInfo {
|
||||
filename: String,
|
||||
mime_type: Option<String>,
|
||||
}
|
||||
|
||||
struct MutationRoot;
|
||||
#[Object]
|
||||
impl MutationRoot {
|
||||
async fn single_upload(&self, ctx: &Context<'_>, file: Upload) -> FileInfo {
|
||||
let upload_value = file.value(ctx).unwrap();
|
||||
println!("single_upload: filename={}", upload_value.filename);
|
||||
println!(
|
||||
"single_upload: content_type={:?}",
|
||||
upload_value.content_type
|
||||
);
|
||||
|
||||
let file_info = FileInfo {
|
||||
filename: upload_value.filename.clone(),
|
||||
mime_type: upload_value.content_type.clone(),
|
||||
};
|
||||
|
||||
let mut content = String::new();
|
||||
upload_value
|
||||
.into_read()
|
||||
.read_to_string(&mut content)
|
||||
.unwrap();
|
||||
assert_eq!(content, "test".to_owned());
|
||||
|
||||
file_info
|
||||
}
|
||||
}
|
||||
|
||||
let schema = Schema::build(QueryRoot, MutationRoot, EmptySubscription).finish();
|
||||
|
||||
let mut app = tide::new();
|
||||
app.at("/").post(GraphQLEndpoint::new(schema));
|
||||
app.listen(listen_addr).await
|
||||
});
|
||||
|
||||
test_utils::wait_server_ready().await;
|
||||
|
||||
let client = test_utils::client();
|
||||
|
||||
let form = reqwest::multipart::Form::new()
|
||||
.text("operations", r#"{ "query": "mutation ($file: Upload!) { singleUpload(file: $file) { filename, mimeType } }", "variables": { "file": null } }"#)
|
||||
.text("map", r#"{ "0": ["variables.file"] }"#)
|
||||
.part("0", reqwest::multipart::Part::stream("test").file_name("test.txt").mime_str("text/plain")?);
|
||||
|
||||
let resp = client
|
||||
.post(&format!("http://{}", listen_addr))
|
||||
.multipart(form)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let string = resp.text().await?;
|
||||
println!("{}", string);
|
||||
|
||||
assert_eq!(
|
||||
string,
|
||||
json!({"data": {"singleUpload": {"filename": "test.txt", "mimeType": "text/plain"}}})
|
||||
.to_string()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
use reqwest::Client;
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn client() -> Client {
|
||||
Client::builder().no_proxy().build().unwrap()
|
||||
}
|
||||
|
||||
pub async fn wait_server_ready() {
|
||||
async_std::task::sleep(Duration::from_secs(1)).await;
|
||||
}
|
Loading…
Reference in New Issue