diff --git a/integrations/actix-web/Cargo.toml b/integrations/actix-web/Cargo.toml index 75bd71f3..1b4ca5b6 100644 --- a/integrations/actix-web/Cargo.toml +++ b/integrations/actix-web/Cargo.toml @@ -14,15 +14,15 @@ categories = ["network-programming", "asynchronous"] [dependencies] async-graphql = { path = "../..", version = "=2.11.0" } -actix = "0.10.0" -actix-http = "2.2.0" -actix-web = { version = "3.3.2", default-features = false } -actix-web-actors = "3.0.0" -async-channel = "1.6.1" +actix = "0.11.1" +actix-http = "3.0.0-beta.5" +actix-web = { version = "4.0.0-beta.5", default-features = false } +actix-web-actors = "4.0.0-beta.4" futures-util = { version = "0.3.13", default-features = false } serde_json = "1.0.64" serde_urlencoded = "0.7.0" +futures-channel = "0.3.13" [dev-dependencies] -actix-rt = "1.1.0" +actix-rt = "2.0.0" async-mutex = "1.4.0" diff --git a/integrations/actix-web/src/lib.rs b/integrations/actix-web/src/lib.rs index fc73c967..878d26a0 100644 --- a/integrations/actix-web/src/lib.rs +++ b/integrations/actix-web/src/lib.rs @@ -11,15 +11,15 @@ use std::future::Future; use std::io::{self, ErrorKind}; use std::pin::Pin; -use actix_web::client::PayloadError; use actix_web::dev::{Payload, PayloadStream}; +use actix_web::error::PayloadError; use actix_web::http::{Method, StatusCode}; use actix_web::{http, Error, FromRequest, HttpRequest, HttpResponse, Responder, Result}; -use futures_util::future::{self, FutureExt, Ready}; -use futures_util::{StreamExt, TryStreamExt}; - use async_graphql::http::MultipartOptions; use async_graphql::ParseRequestError; +use futures_channel::mpsc; +use futures_util::future::{self, FutureExt}; +use futures_util::{SinkExt, StreamExt, TryStreamExt}; /// Extractor for GraphQL request. /// @@ -84,7 +84,7 @@ impl FromRequest for BatchRequest { .and_then(|value| value.to_str().ok()) .map(|value| value.to_string()); - let (tx, rx) = async_channel::bounded(16); + let (mut tx, rx) = mpsc::channel(16); // Payload is !Send so we create indirection with a channel let mut payload = payload.take(); @@ -160,20 +160,17 @@ impl From for Response { } impl Responder for Response { - type Error = Error; - type Future = Ready>; - - fn respond_to(self, _req: &HttpRequest) -> Self::Future { + fn respond_to(self, _req: &HttpRequest) -> HttpResponse { let mut res = HttpResponse::build(StatusCode::OK); res.content_type("application/json"); if self.0.is_ok() { if let Some(cache_control) = self.0.cache_control().value() { - res.header("cache-control", cache_control); + res.append_header(("cache-control", cache_control)); } } for (name, value) in self.0.http_headers() { - res.header(name, value); + res.append_header((name, value)); } - futures_util::future::ok(res.body(serde_json::to_string(&self.0).unwrap())) + res.body(serde_json::to_string(&self.0).unwrap()) } } diff --git a/integrations/actix-web/src/subscription.rs b/integrations/actix-web/src/subscription.rs index ad2713e6..e4dd2b9a 100644 --- a/integrations/actix-web/src/subscription.rs +++ b/integrations/actix-web/src/subscription.rs @@ -2,19 +2,22 @@ use std::future::Future; use std::str::FromStr; use std::time::{Duration, Instant}; +use actix::ActorFutureExt; use actix::{ - Actor, ActorContext, ActorFuture, ActorStream, AsyncContext, ContextFutureSpawner, - StreamHandler, WrapFuture, WrapStream, + Actor, ActorContext, ActorStreamExt, AsyncContext, ContextFutureSpawner, StreamHandler, + WrapFuture, WrapStream, }; use actix_http::error::PayloadError; use actix_http::{ws, Error}; -use actix_web::web::Bytes; +use actix_web::web::{BufMut, Bytes, BytesMut}; use actix_web::{HttpRequest, HttpResponse}; use actix_web_actors::ws::{CloseReason, Message, ProtocolError, WebsocketContext}; use async_graphql::http::{WebSocket, WebSocketProtocols, WsMessage, ALL_WEBSOCKET_PROTOCOLS}; use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType}; +use futures_channel::mpsc; use futures_util::future::Ready; use futures_util::stream::Stream; +use futures_util::SinkExt; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); @@ -24,9 +27,9 @@ pub struct WSSubscription { schema: Schema, protocol: WebSocketProtocols, last_heartbeat: Instant, - messages: Option>>, + messages: Option>, initializer: Option, - continuation: Vec, + continuation: BytesMut, } impl @@ -94,7 +97,7 @@ where last_heartbeat: Instant::now(), messages: None, initializer: Some(initializer), - continuation: Vec::new(), + continuation: BytesMut::new(), }, &ALL_WEBSOCKET_PROTOCOLS, request, @@ -125,7 +128,7 @@ where fn started(&mut self, ctx: &mut Self::Context) { self.send_heartbeats(ctx); - let (tx, rx) = async_channel::unbounded(); + let (tx, rx) = mpsc::unbounded(); WebSocket::with_data( self.schema.clone(), @@ -178,20 +181,21 @@ where } Message::Continuation(item) => match item { ws::Item::FirstText(bytes) | ws::Item::FirstBinary(bytes) => { - self.continuation = bytes.to_vec(); + self.continuation.clear(); + self.continuation.put(bytes); None } ws::Item::Continue(bytes) => { - self.continuation.extend_from_slice(&bytes); + self.continuation.put(bytes); None } ws::Item::Last(bytes) => { - self.continuation.extend_from_slice(&bytes); - Some(std::mem::take(&mut self.continuation)) + self.continuation.put(bytes); + Some(std::mem::take(&mut self.continuation).freeze()) } }, Message::Text(s) => Some(s.into_bytes()), - Message::Binary(bytes) => Some(bytes.to_vec()), + Message::Binary(bytes) => Some(bytes), Message::Close(_) => { ctx.stop(); None @@ -200,7 +204,7 @@ where }; if let Some(message) = message { - let sender = self.messages.as_ref().unwrap().clone(); + let mut sender = self.messages.as_ref().unwrap().clone(); async move { sender.send(message).await } .into_actor(self)