diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a5b72c6e..ed3eeea3 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -42,6 +42,9 @@ jobs: - name: async-graphql-rocket registryName: async-graphql-rocket path: integrations/rocket + - name: async-graphql-tide + registryName: async-graphql-tide + path: integrations/tide steps: - uses: actions-rs/toolchain@v1 with: diff --git a/Cargo.toml b/Cargo.toml index 1a5b0f13..9dd1f22e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,4 +88,5 @@ members = [ "integrations/rocket", "integrations/warp", "integrations/axum", + "integrations/tide", ] diff --git a/README.md b/README.md index 09a94ec8..d492e98f 100644 --- a/README.md +++ b/README.md @@ -104,8 +104,6 @@ cd examples && cargo run --bin [name] * Rocket [async-graphql-rocket](https://github.com/async-graphql/async-graphql/tree/master/integrations/rocket) * Axum [async-graphql-axum](https://github.com/async-graphql/async-graphql/tree/master/integrations/axum) -**About actix-web-v4**: The beta version is supported in the branch `actix-web-beta` [Related Issue](https://github.com/async-graphql/async-graphql/issues/590) - ## Who's using Async-graphql in production? - [Vector](https://vector.dev/) diff --git a/examples b/examples index be1508f1..d7fdd500 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit be1508f163311f0ca9ed987c9487fd7d28fac942 +Subproject commit d7fdd5003034de745492ab1ffe88455f7e49c58f diff --git a/integrations/tide/Cargo.toml b/integrations/tide/Cargo.toml new file mode 100644 index 00000000..a12c0cda --- /dev/null +++ b/integrations/tide/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "async-graphql-tide" +version = "3.0.4" +authors = ["vkill ", "sunli "] +edition = "2021" +description = "async-graphql for tide" +license = "MIT/Apache-2.0" +documentation = "https://docs.rs/async-graphql-tide/" +homepage = "https://github.com/async-graphql/async-graphql" +repository = "https://github.com/async-graphql/async-graphql" +keywords = ["futures", "async", "graphql"] +categories = ["network-programming", "asynchronous"] + +[features] +default = ["websocket"] +websocket = ["tide-websockets"] + +[dependencies] +async-graphql = { path = "../..", version = "=3.0.4" } +async-trait = "0.1.48" +futures-util = "0.3.13" +serde_json = "1.0.64" + +tide = { version = "0.16.0", default-features = false, features = ["h1-server"] } +tide-websockets = { version = "0.4.0", optional = true } + +[dev-dependencies] +# Surf lacks multipart support +reqwest = { version = "0.11.2", default-features = false, features = ["json", "multipart"] } +async-std = { version = "1.9.0", features = ["attributes", "tokio1"] } +serde_json = "1.0.64" diff --git a/integrations/tide/src/lib.rs b/integrations/tide/src/lib.rs new file mode 100644 index 00000000..5f8d7307 --- /dev/null +++ b/integrations/tide/src/lib.rs @@ -0,0 +1,175 @@ +//! Async-graphql integration with Tide +//! +//! # Examples +//! *[Full Example]()* + +#![warn(missing_docs)] +#![allow(clippy::type_complexity)] +#![allow(clippy::needless_doctest_main)] +#![forbid(unsafe_code)] + +#[cfg(feature = "websocket")] +mod subscription; + +use async_graphql::http::MultipartOptions; +use async_graphql::{ObjectType, ParseRequestError, Schema, SubscriptionType}; +use tide::utils::async_trait; +use tide::{ + http::{ + headers::{self, HeaderValue}, + Method, + }, + Body, Request, Response, StatusCode, +}; + +#[cfg(feature = "websocket")] +pub use subscription::GraphQLSubscription; + +/// Create a new GraphQL endpoint with the schema. +/// +/// Default multipart options are used and batch operations are supported. +pub fn graphql( + schema: Schema, +) -> GraphQLEndpoint { + GraphQLEndpoint { + schema, + opts: MultipartOptions::default(), + batch: true, + } +} + +/// A GraphQL endpoint. +/// +/// This is created with the [`endpoint`](fn.endpoint.html) function. +#[non_exhaustive] +pub struct GraphQLEndpoint { + /// The schema of the endpoint. + pub schema: Schema, + /// The multipart options of the endpoint. + pub opts: MultipartOptions, + /// Whether to support batch requests in the endpoint. + pub batch: bool, +} + +impl GraphQLEndpoint { + /// Set the multipart options of the endpoint. + #[must_use] + pub fn multipart_opts(self, opts: MultipartOptions) -> Self { + Self { opts, ..self } + } + /// Set whether batch requests are supported in the endpoint. + #[must_use] + pub fn batch(self, batch: bool) -> Self { + Self { batch, ..self } + } +} + +// Manual impl to remove bounds on generics +impl Clone for GraphQLEndpoint { + fn clone(&self) -> Self { + Self { + schema: self.schema.clone(), + opts: self.opts, + batch: self.batch, + } + } +} + +#[async_trait] +impl tide::Endpoint + for GraphQLEndpoint +where + Query: ObjectType + 'static, + Mutation: ObjectType + 'static, + Subscription: SubscriptionType + 'static, + TideState: Clone + Send + Sync + 'static, +{ + async fn call(&self, request: Request) -> tide::Result { + respond( + self.schema + .execute_batch(if self.batch { + receive_batch_request_opts(request, self.opts).await + } else { + receive_request_opts(request, self.opts) + .await + .map(Into::into) + }?) + .await, + ) + } +} + +/// Convert a Tide request to a GraphQL request. +pub async fn receive_request( + request: Request, +) -> tide::Result { + receive_request_opts(request, Default::default()).await +} + +/// Convert a Tide request to a GraphQL request with options on how to receive multipart. +pub async fn receive_request_opts( + request: Request, + opts: MultipartOptions, +) -> tide::Result { + receive_batch_request_opts(request, opts) + .await? + .into_single() + .map_err(|e| tide::Error::new(StatusCode::BadRequest, e)) +} + +/// Convert a Tide request to a GraphQL batch request. +pub async fn receive_batch_request( + request: Request, +) -> tide::Result { + receive_batch_request_opts(request, Default::default()).await +} + +/// Convert a Tide request to a GraphQL batch request with options on how to receive multipart. +pub async fn receive_batch_request_opts( + mut request: Request, + opts: MultipartOptions, +) -> tide::Result { + if request.method() == Method::Get { + request.query::().map(Into::into) + } else if request.method() == Method::Post { + let body = request.take_body(); + let content_type = request + .header(headers::CONTENT_TYPE) + .and_then(|values| values.get(0)) + .map(HeaderValue::as_str); + + async_graphql::http::receive_batch_body(content_type, body, opts) + .await + .map_err(|e| { + tide::Error::new( + match &e { + ParseRequestError::PayloadTooLarge => StatusCode::PayloadTooLarge, + _ => StatusCode::BadRequest, + }, + e, + ) + }) + } else { + Err(tide::Error::from_str( + StatusCode::MethodNotAllowed, + "GraphQL only supports GET and POST requests", + )) + } +} + +/// Convert a GraphQL response to a Tide response. +pub fn respond(resp: impl Into) -> tide::Result { + let resp = resp.into(); + + let mut response = Response::new(StatusCode::Ok); + if resp.is_ok() { + if let Some(cache_control) = resp.cache_control().value() { + response.insert_header(headers::CACHE_CONTROL, cache_control); + } + } + for (name, value) in resp.http_headers() { + response.append_header(name, value); + } + response.set_body(Body::from_json(&resp)?); + Ok(response) +} diff --git a/integrations/tide/src/subscription.rs b/integrations/tide/src/subscription.rs new file mode 100644 index 00000000..47de7ddb --- /dev/null +++ b/integrations/tide/src/subscription.rs @@ -0,0 +1,126 @@ +use std::future::Future; +use std::str::FromStr; + +use async_graphql::http::{ + WebSocket as AGWebSocket, WebSocketProtocols, WsMessage, ALL_WEBSOCKET_PROTOCOLS, +}; +use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType}; +use futures_util::future::Ready; +use futures_util::{future, StreamExt}; +use tide::Endpoint; +use tide_websockets::tungstenite::protocol::CloseFrame; +use tide_websockets::Message; + +/// A GraphQL subscription endpoint builder. +#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))] +pub struct GraphQLSubscription { + schema: Schema, + on_connection_init: OnConnInit, +} + +type DefaultOnConnInitType = fn(serde_json::Value) -> Ready>; + +fn default_on_connection_init(_: serde_json::Value) -> Ready> { + futures_util::future::ready(Ok(Data::default())) +} + +impl + GraphQLSubscription +where + Query: ObjectType + 'static, + Mutation: ObjectType + 'static, + Subscription: SubscriptionType + 'static, +{ + /// Create a [`GraphQLSubscription`] object. + pub fn new(schema: Schema) -> Self { + GraphQLSubscription { + schema, + on_connection_init: default_on_connection_init, + } + } +} + +impl + GraphQLSubscription +where + Query: ObjectType + 'static, + Mutation: ObjectType + 'static, + Subscription: SubscriptionType + 'static, + OnConnInit: Fn(serde_json::Value) -> OnConnInitFut + Clone + Send + Sync + 'static, + OnConnInitFut: Future> + Send + 'static, +{ + /// Specify a callback function to be called when the connection is initialized. + /// + /// You can get something from the payload of [`GQL_CONNECTION_INIT` message](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md#gql_connection_init) to create [`Data`]. + /// The data returned by this callback function will be merged with the data specified by [`with_data`]. + pub fn on_connection_init( + self, + callback: OnConnInit2, + ) -> GraphQLSubscription + where + OnConnInit2: Fn(serde_json::Value) -> Fut + Clone + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + GraphQLSubscription { + schema: self.schema, + on_connection_init: callback, + } + } + + /// Consumes this builder to create a tide endpoint. + pub fn build(self) -> impl Endpoint { + tide_websockets::WebSocket::::new(move |request, connection| { + let schema = self.schema.clone(); + let on_connection_init = self.on_connection_init.clone(); + async move { + let protocol = match request + .header("sec-websocket-protocol") + .map(|value| value.as_str()) + .and_then(|protocols| { + protocols + .split(',') + .find_map(|p| WebSocketProtocols::from_str(p.trim()).ok()) + }) { + Some(protocol) => protocol, + None => { + // default to the prior standard + WebSocketProtocols::SubscriptionsTransportWS + } + }; + + let sink = connection.clone(); + let mut stream = AGWebSocket::new( + schema.clone(), + connection + .take_while(|msg| future::ready(msg.is_ok())) + .map(Result::unwrap) + .map(Message::into_data), + protocol, + ) + .on_connection_init(on_connection_init); + + while let Some(data) = stream.next().await { + match data { + WsMessage::Text(text) => { + if sink.send_string(text).await.is_err() { + break; + } + } + WsMessage::Close(code, msg) => { + let _ = sink + .send(Message::Close(Some(CloseFrame { + code: code.into(), + reason: msg.into(), + }))) + .await; + break; + } + } + } + + Ok(()) + } + }) + .with_protocols(&ALL_WEBSOCKET_PROTOCOLS) + } +} diff --git a/integrations/tide/tests/graphql.rs b/integrations/tide/tests/graphql.rs new file mode 100644 index 00000000..7363b653 --- /dev/null +++ b/integrations/tide/tests/graphql.rs @@ -0,0 +1,218 @@ +mod test_utils; + +use std::io::Read; + +use async_graphql::*; +use reqwest::{header, StatusCode}; +use serde_json::json; + +type Result = std::result::Result>; + +#[async_std::test] +async fn quickstart() -> Result<()> { + let listen_addr = "127.0.0.1:8081"; + + async_std::task::spawn(async move { + struct QueryRoot; + #[Object] + impl QueryRoot { + /// Returns the sum of a and b + async fn add(&self, a: i32, b: i32) -> i32 { + a + b + } + } + + let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription).finish(); + + let mut app = tide::new(); + let endpoint = async_graphql_tide::graphql_endpoint(schema); + app.at("/").post(endpoint.clone()).get(endpoint); + app.listen(listen_addr).await + }); + + test_utils::wait_server_ready().await; + + let client = test_utils::client(); + + let resp = client + .post(&format!("http://{}", listen_addr)) + .json(&json!({"query":"{ add(a: 10, b: 20) }"})) + .send() + .await?; + + assert_eq!(resp.status(), StatusCode::OK); + let string = resp.text().await?; + println!("via post {}", string); + + assert_eq!(string, json!({"data": {"add": 30}}).to_string()); + + let resp = client + .get(&format!("http://{}", listen_addr)) + .query(&[("query", "{ add(a: 10, b: 20) }")]) + .send() + .await?; + + assert_eq!(resp.status(), StatusCode::OK); + let string = resp.text().await?; + println!("via get {}", string); + + assert_eq!(string, json!({"data": {"add": 30}}).to_string()); + + Ok(()) +} + +#[async_std::test] +async fn hello() -> Result<()> { + let listen_addr = "127.0.0.1:8082"; + + async_std::task::spawn(async move { + struct Hello(String); + struct QueryRoot; + #[Object] + impl QueryRoot { + /// Returns hello + async fn hello<'a>(&self, ctx: &'a Context<'_>) -> String { + let name = ctx.data_opt::().map(|hello| hello.0.as_str()); + format!("Hello, {}!", name.unwrap_or("world")) + } + } + + let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription).finish(); + + let mut app = tide::new(); + + app.at("/").post(move |req: tide::Request<()>| { + let schema = schema.clone(); + async move { + let name = req + .header("name") + .and_then(|values| values.get(0)) + .map(ToString::to_string); + let mut req = async_graphql_tide::receive_request(req).await?; + if let Some(name) = name { + req = req.data(Hello(name)); + } + async_graphql_tide::respond(schema.execute(req).await) + } + }); + app.listen(listen_addr).await + }); + + test_utils::wait_server_ready().await; + + let client = test_utils::client(); + + let resp = client + .post(&format!("http://{}", listen_addr)) + .json(&json!({"query":"{ hello }"})) + .header("Name", "Foo") + .send() + .await?; + + assert_eq!(resp.status(), StatusCode::OK); + let string = resp.text().await?; + println!("{}", string); + + assert_eq!(string, json!({"data":{"hello":"Hello, Foo!"}}).to_string()); + + let resp = client + .post(&format!("http://{}", listen_addr)) + .json(&json!({"query":"{ hello }"})) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await?; + + assert_eq!(resp.status(), StatusCode::OK); + let string = resp.text().await?; + println!("{}", string); + + assert_eq!( + string, + json!({"data":{"hello":"Hello, world!"}}).to_string() + ); + + Ok(()) +} + +#[async_std::test] +async fn upload() -> Result<()> { + let listen_addr = "127.0.0.1:8083"; + + async_std::task::spawn(async move { + struct QueryRoot; + + #[Object] + impl QueryRoot { + async fn value(&self) -> i32 { + 10 + } + } + + #[derive(Clone, SimpleObject)] + pub struct FileInfo { + filename: String, + mime_type: Option, + } + + struct MutationRoot; + #[Object] + impl MutationRoot { + async fn single_upload(&self, ctx: &Context<'_>, file: Upload) -> FileInfo { + let upload_value = file.value(ctx).unwrap(); + println!("single_upload: filename={}", upload_value.filename); + println!( + "single_upload: content_type={:?}", + upload_value.content_type + ); + + let file_info = FileInfo { + filename: upload_value.filename.clone(), + mime_type: upload_value.content_type.clone(), + }; + + let mut content = String::new(); + upload_value + .into_read() + .read_to_string(&mut content) + .unwrap(); + assert_eq!(content, "test".to_owned()); + + file_info + } + } + + let schema = Schema::build(QueryRoot, MutationRoot, EmptySubscription).finish(); + + let mut app = tide::new(); + app.at("/") + .post(async_graphql_tide::graphql_endpoint(schema)); + app.listen(listen_addr).await + }); + + test_utils::wait_server_ready().await; + + let client = test_utils::client(); + + let form = reqwest::multipart::Form::new() + .text("operations", r#"{ "query": "mutation ($file: Upload!) { singleUpload(file: $file) { filename, mimeType } }", "variables": { "file": null } }"#) + .text("map", r#"{ "0": ["variables.file"] }"#) + .part("0", reqwest::multipart::Part::stream("test").file_name("test.txt").mime_str("text/plain")?); + + let resp = client + .post(&format!("http://{}", listen_addr)) + .multipart(form) + .send() + .await?; + + assert_eq!(resp.status(), StatusCode::OK); + let string = resp.text().await?; + println!("{}", string); + + assert_eq!( + string, + json!({"data": {"singleUpload": {"filename": "test.txt", "mimeType": "text/plain"}}}) + .to_string() + ); + + Ok(()) +} diff --git a/integrations/tide/tests/test_utils.rs b/integrations/tide/tests/test_utils.rs new file mode 100644 index 00000000..5d0e2a2b --- /dev/null +++ b/integrations/tide/tests/test_utils.rs @@ -0,0 +1,10 @@ +use reqwest::Client; +use std::time::Duration; + +pub fn client() -> Client { + Client::builder().no_proxy().build().unwrap() +} + +pub async fn wait_server_ready() { + async_std::task::sleep(Duration::from_secs(1)).await; +} diff --git a/src/lib.rs b/src/lib.rs index 31bb3a57..a41c3eee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,9 +76,12 @@ //! //! ## Integrations //! -//! * Actix-web [async-graphql-actix_web](https://crates.io/crates/async-graphql-actix-web) +//! * Poem [async-graphql-poem](https://crates.io/crates/async-graphql-poem) +//! * Actix-web [async-graphql-actix-web](https://crates.io/crates/async-graphql-actix-web) //! * Warp [async-graphql-warp](https://crates.io/crates/async-graphql-warp) //! * Tide [async-graphql-tide](https://crates.io/crates/async-graphql-tide) +//! * Rocket [async-graphql-rocket](https://github.com/async-graphql/async-graphql/tree/master/integrations/rocket) +//! * Axum [async-graphql-axum](https://github.com/async-graphql/async-graphql/tree/master/integrations/axum) //! //! ## License //!