diff --git a/integrations/actix-web/Cargo.toml b/integrations/actix-web/Cargo.toml index 256a5aa5..6f4acc2e 100644 --- a/integrations/actix-web/Cargo.toml +++ b/integrations/actix-web/Cargo.toml @@ -13,12 +13,12 @@ categories = ["network-programming", "asynchronous"] [dependencies] async-graphql = { path = "../..", version = "=2.11.0" } - -actix = "0.12" -actix-http = "3.0.0-beta.10" -actix-web = { version = "4.0.0-beta.9", default-features = false } +actix = "0.12.0" +actix-http = "3.0.0-beta.11" +actix-web = { version = "4.0.0-beta.10", default-features = false } actix-web-actors = "4.0.0-beta.7" -futures-util = { version = "0.3.13", default-features = false } +async-channel = "1.6.1" +futures-util = { version = "0.3.17", default-features = false } serde_json = "1.0.64" serde_urlencoded = "0.7.0" futures-channel = "0.3.13" diff --git a/integrations/actix-web/src/lib.rs b/integrations/actix-web/src/lib.rs index 2ac15564..a0c4e227 100644 --- a/integrations/actix-web/src/lib.rs +++ b/integrations/actix-web/src/lib.rs @@ -3,23 +3,22 @@ #![allow(clippy::upper_case_acronyms)] #![warn(missing_docs)] -mod subscription; - -pub use subscription::WSSubscription; - use std::future::Future; use std::io::{self, ErrorKind}; use std::pin::Pin; +use actix_http::error::PayloadError; use actix_web::dev::{Payload, PayloadStream}; -use actix_web::error::PayloadError; use actix_web::http::{Method, StatusCode}; use actix_web::{http, Error, FromRequest, HttpRequest, HttpResponse, Responder, Result}; +use futures_util::future::{self, FutureExt}; +use futures_util::{StreamExt, TryStreamExt}; + use async_graphql::http::MultipartOptions; use async_graphql::ParseRequestError; -use futures_channel::mpsc; -use futures_util::future::{self, FutureExt}; -use futures_util::{SinkExt, StreamExt, TryStreamExt}; +pub use subscription::WSSubscription; + +mod subscription; /// Extractor for GraphQL request. /// @@ -40,7 +39,6 @@ type BatchToRequestMapper = impl FromRequest for Request { type Error = Error; type Future = future::Map<::Future, BatchToRequestMapper>; - type Config = MultipartOptions; fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { BatchRequest::from_request(req, payload).map(|res| { @@ -69,10 +67,12 @@ impl BatchRequest { impl FromRequest for BatchRequest { type Error = Error; type Future = Pin>>>; - type Config = MultipartOptions; fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - let config = req.app_data::().cloned().unwrap_or_default(); + let config = req + .app_data::() + .cloned() + .unwrap_or_default(); if req.method() == Method::GET { let res = serde_urlencoded::from_str(req.query_string()); @@ -84,7 +84,7 @@ impl FromRequest for BatchRequest { .and_then(|value| value.to_str().ok()) .map(|value| value.to_string()); - let (mut tx, rx) = mpsc::channel(16); + let (tx, rx) = async_channel::bounded(16); // Payload is !Send so we create indirection with a channel let mut payload = payload.take(); @@ -118,7 +118,7 @@ impl FromRequest for BatchRequest { } PayloadError::Http2Payload(e) if e.is_io() => e.into_io().unwrap(), PayloadError::Http2Payload(e) => io::Error::new(ErrorKind::Other, e), - _ => io::Error::new(ErrorKind::Other, "unknown error"), + _ => io::Error::new(ErrorKind::Other, e), }) .into_async_read(), config, @@ -174,4 +174,4 @@ impl Responder for Response { } res.body(serde_json::to_string(&self.0).unwrap()) } -} +} \ No newline at end of file diff --git a/integrations/actix-web/src/subscription.rs b/integrations/actix-web/src/subscription.rs index ac388bf7..4a4f6aea 100644 --- a/integrations/actix-web/src/subscription.rs +++ b/integrations/actix-web/src/subscription.rs @@ -3,20 +3,19 @@ use std::str::FromStr; use std::time::{Duration, Instant}; use actix::{ - Actor, ActorContext, ActorFutureExt, ActorStreamExt, AsyncContext, ContextFutureSpawner, - StreamHandler, WrapFuture, WrapStream, + Actor, ActorContext, AsyncContext, ContextFutureSpawner, StreamHandler, WrapFuture, WrapStream, }; -use actix_http::ws::Item; -use actix_web::error::{Error, PayloadError}; -use actix_web::web::{BufMut, Bytes, BytesMut}; +use actix::{ActorFutureExt, ActorStreamExt}; +use actix_http::error::PayloadError; +use actix_http::ws; +use actix_web::web::Bytes; use actix_web::{HttpRequest, HttpResponse}; use actix_web_actors::ws::{CloseReason, Message, ProtocolError, WebsocketContext}; -use async_graphql::http::{WebSocket, WebSocketProtocols, WsMessage, ALL_WEBSOCKET_PROTOCOLS}; -use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType}; -use futures_channel::mpsc; use futures_util::future::Ready; use futures_util::stream::Stream; -use futures_util::SinkExt; + +use async_graphql::http::{WebSocket, WebSocketProtocols, WsMessage, ALL_WEBSOCKET_PROTOCOLS}; +use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType}; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); @@ -26,9 +25,9 @@ pub struct WSSubscription { schema: Schema, protocol: WebSocketProtocols, last_heartbeat: Instant, - messages: Option>, + messages: Option>>, initializer: Option, - continuation: BytesMut, + continuation: Vec, } impl @@ -43,7 +42,7 @@ where schema: Schema, request: &HttpRequest, stream: T, - ) -> Result + ) -> Result where T: Stream> + 'static, { @@ -67,7 +66,7 @@ where request: &HttpRequest, stream: T, initializer: F, - ) -> Result + ) -> Result where T: Stream> + 'static, F: FnOnce(serde_json::Value) -> R + Unpin + Send + 'static, @@ -96,7 +95,7 @@ where last_heartbeat: Instant::now(), messages: None, initializer: Some(initializer), - continuation: BytesMut::new(), + continuation: Vec::new(), }, &ALL_WEBSOCKET_PROTOCOLS, request, @@ -127,7 +126,7 @@ where fn started(&mut self, ctx: &mut Self::Context) { self.send_heartbeats(ctx); - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = async_channel::unbounded(); WebSocket::with_data( self.schema.clone(), @@ -179,22 +178,21 @@ where None } Message::Continuation(item) => match item { - Item::FirstText(bytes) | Item::FirstBinary(bytes) => { - self.continuation.clear(); - self.continuation.put(bytes); + ws::Item::FirstText(bytes) | ws::Item::FirstBinary(bytes) => { + self.continuation = bytes.to_vec(); None } - Item::Continue(bytes) => { - self.continuation.put(bytes); + ws::Item::Continue(bytes) => { + self.continuation.extend_from_slice(&bytes); None } - Item::Last(bytes) => { - self.continuation.put(bytes); - Some(std::mem::take(&mut self.continuation).freeze()) + ws::Item::Last(bytes) => { + self.continuation.extend_from_slice(&bytes); + Some(std::mem::take(&mut self.continuation)) } }, - Message::Text(s) => Some(s.into_bytes()), - Message::Binary(bytes) => Some(bytes), + Message::Text(s) => Some(s.into_bytes().to_vec()), + Message::Binary(bytes) => Some(bytes.to_vec()), Message::Close(_) => { ctx.stop(); None @@ -203,7 +201,7 @@ where }; if let Some(message) = message { - let mut sender = self.messages.as_ref().unwrap().clone(); + let sender = self.messages.as_ref().unwrap().clone(); async move { sender.send(message).await } .into_actor(self) @@ -214,4 +212,4 @@ where .spawn(ctx) } } -} +} \ No newline at end of file diff --git a/integrations/actix-web/tests/graphql.rs b/integrations/actix-web/tests/graphql.rs index f53111ea..bd328142 100644 --- a/integrations/actix-web/tests/graphql.rs +++ b/integrations/actix-web/tests/graphql.rs @@ -1,14 +1,16 @@ -mod test_utils; -use actix_http::Request; -use actix_web::dev::{MessageBody, Service, ServiceResponse}; -use actix_web::{guard, test, web, App}; -use async_graphql::*; +use actix_http::Method; +use actix_web::dev::{AnyBody, Service}; +use actix_web::{guard, test, web, web::Data, App}; use serde_json::json; + +use async_graphql::*; use test_utils::*; +mod test_utils; + #[actix_rt::test] async fn test_playground() { - let app = test::init_service( + let srv = test::init_service( App::new().service( web::resource("/") .guard(guard::Get()) @@ -16,20 +18,26 @@ async fn test_playground() { ), ) .await; - let req = test::TestRequest::with_uri("/").to_request(); - - let resp = app.call(req).await.unwrap(); - assert!(resp.status().is_success()); - let body = test::read_body(resp).await; - assert!(std::str::from_utf8(&body).unwrap().contains("graphql")); + let response = srv.call(req).await.unwrap(); + assert!(response.status().is_success()); + let body = response.response().body(); + if let AnyBody::Bytes(bytes) = body { + assert!(std::str::from_utf8(&bytes).unwrap().contains("graphql")); + } else { + panic!("response body must be Bytes {:?}", body); + } } #[actix_rt::test] async fn test_add() { - let app = test::init_service( + let srv = test::init_service( App::new() - .data(Schema::new(AddQueryRoot, EmptyMutation, EmptySubscription)) + .app_data(Data::new(Schema::new( + AddQueryRoot, + EmptyMutation, + EmptySubscription, + ))) .service( web::resource("/") .guard(guard::Post()) @@ -37,27 +45,32 @@ async fn test_add() { ), ) .await; - - let resp = test::TestRequest::post() - .uri("/") - .set_payload(r#"{"query":"{ add(a: 10, b: 20) }"}"#) - .send_request(&app) - .await; - - assert!(resp.status().is_success()); - let body = test::read_body(resp).await; - assert_eq!(body, json!({"data": {"add": 30}}).to_string()); + let response = srv + .call( + test::TestRequest::with_uri("/") + .method(Method::POST) + .set_payload(r#"{"query":"{ add(a: 10, b: 20) }"}"#) + .to_request(), + ) + .await + .unwrap(); + assert!(response.status().is_success()); + let body = response.response().body(); + assert_eq!( + body, + &AnyBody::Bytes(json!({"data": {"add": 30}}).to_string().into_bytes().into()) + ); } #[actix_rt::test] async fn test_hello() { - let app = test::init_service( + let srv = test::init_service( App::new() - .data(Schema::new( + .app_data(Data::new(Schema::new( HelloQueryRoot, EmptyMutation, EmptySubscription, - )) + ))) .service( web::resource("/") .guard(guard::Post()) @@ -66,29 +79,37 @@ async fn test_hello() { ) .await; - let resp = test::TestRequest::post() - .uri("/") - .set_payload(r#"{"query":"{ hello }"}"#) - .send_request(&app) - .await; - - assert!(resp.status().is_success()); - let body = test::read_body(resp).await; + let response = srv + .call( + test::TestRequest::with_uri("/") + .method(Method::POST) + .set_payload(r#"{"query":"{ hello }"}"#) + .to_request(), + ) + .await + .unwrap(); + assert!(response.status().is_success()); + let body = response.response().body(); assert_eq!( body, - json!({"data": {"hello": "Hello, world!"}}).to_string() + &AnyBody::Bytes( + json!({"data": {"hello": "Hello, world!"}}) + .to_string() + .into_bytes() + .into() + ) ); } #[actix_rt::test] async fn test_hello_header() { - let app = test::init_service( + let srv = test::init_service( App::new() - .data(Schema::new( + .app_data(Data::new(Schema::new( HelloQueryRoot, EmptyMutation, EmptySubscription, - )) + ))) .service( web::resource("/") .guard(guard::Post()) @@ -97,27 +118,38 @@ async fn test_hello_header() { ) .await; - let resp = test::TestRequest::post() - .uri("/") - .append_header(("Name", "Foo")) - .set_payload(r#"{"query":"{ hello }"}"#) - .send_request(&app) - .await; - - assert!(resp.status().is_success()); - let body = test::read_body(resp).await; - assert_eq!(body, json!({"data": {"hello": "Hello, Foo!"}}).to_string()); + let response = srv + .call( + test::TestRequest::with_uri("/") + .method(Method::POST) + .insert_header(("Name", "Foo")) + .set_payload(r#"{"query":"{ hello }"}"#) + .to_request(), + ) + .await + .unwrap(); + assert!(response.status().is_success()); + let body = response.response().body(); + assert_eq!( + body, + &AnyBody::Bytes( + json!({"data": {"hello": "Hello, Foo!"}}) + .to_string() + .into_bytes() + .into() + ) + ); } #[actix_rt::test] async fn test_count() { - let app = test::init_service( + let srv = test::init_service( App::new() - .data( + .app_data(Data::new( Schema::build(CountQueryRoot, CountMutation, EmptySubscription) .data(Count::default()) .finish(), - ) + )) .service( web::resource("/") .guard(guard::Post()) @@ -126,37 +158,87 @@ async fn test_count() { ) .await; - count_action_helper(0, r#"{"query":"{ count }"}"#, &app).await; - count_action_helper(10, r#"{"query":"mutation{ addCount(count: 10) }"}"#, &app).await; - count_action_helper( - 8, - r#"{"query":"mutation{ subtractCount(count: 2) }"}"#, - &app, - ) - .await; - count_action_helper( - 6, - r#"{"query":"mutation{ subtractCount(count: 2) }"}"#, - &app, - ) - .await; -} + let response = srv + .call( + test::TestRequest::with_uri("/") + .method(Method::POST) + .set_payload(r#"{"query":"{ count }"}"#) + .to_request(), + ) + .await + .unwrap(); + assert!(response.status().is_success()); + let body = response.response().body(); + assert_eq!( + body, + &AnyBody::Bytes( + json!({"data": {"count": 0}}) + .to_string() + .into_bytes() + .into() + ) + ); -async fn count_action_helper(expected: i32, payload: &'static str, app: &S) -where - S: Service, Error = E>, - B: MessageBody + Unpin, - E: std::fmt::Debug, -{ - let resp = test::TestRequest::post() - .uri("/") - .set_payload(payload) - .send_request(app) - .await; + let response = srv + .call( + test::TestRequest::with_uri("/") + .method(Method::POST) + .set_payload(r#"{"query":"mutation{ addCount(count: 10) }"}"#) + .to_request(), + ) + .await + .unwrap(); + assert!(response.status().is_success()); + let body = response.response().body(); + assert_eq!( + body, + &AnyBody::Bytes( + json!({"data": {"addCount": 10}}) + .to_string() + .into_bytes() + .into() + ) + ); - assert!(resp.status().is_success()); - let body = test::read_body(resp).await; - assert!(std::str::from_utf8(&body) - .unwrap() - .contains(&expected.to_string())); -} + let response = srv + .call( + test::TestRequest::with_uri("/") + .method(Method::POST) + .set_payload(r#"{"query":"mutation{ subtractCount(count: 2) }"}"#) + .to_request(), + ) + .await + .unwrap(); + assert!(response.status().is_success()); + let body = response.response().body(); + assert_eq!( + body, + &AnyBody::Bytes( + json!({"data": {"subtractCount": 8}}) + .to_string() + .into_bytes() + .into() + ) + ); + + let response = srv + .call( + test::TestRequest::with_uri("/") + .method(Method::POST) + .set_payload(r#"{"query":"mutation{ subtractCount(count: 2) }"}"#) + .to_request(), + ) + .await + .unwrap(); + assert!(response.status().is_success()); + let body = response.response().body(); + assert_eq!( + body, + &AnyBody::Bytes( + json!({"data": {"subtractCount": 6}}) + .to_string() + .into_bytes() + .into() + ) + ); +} \ No newline at end of file