From 492bba1cddf33b8711dce5e8f856635234f9c5cf Mon Sep 17 00:00:00 2001 From: Sunli Date: Tue, 15 Sep 2020 20:07:59 +0800 Subject: [PATCH] Move http::websocket::create and http::websocket::create_with_initializer function to http::WebSocketStream. --- integrations/actix-web/src/subscription.rs | 12 ++-- integrations/warp/src/lib.rs | 11 ++-- src/http/mod.rs | 3 +- src/http/websocket.rs | 72 +++++++++++----------- tests/subscription_websocket.rs | 8 +-- 5 files changed, 56 insertions(+), 50 deletions(-) diff --git a/integrations/actix-web/src/subscription.rs b/integrations/actix-web/src/subscription.rs index 76bfe391..93469f02 100644 --- a/integrations/actix-web/src/subscription.rs +++ b/integrations/actix-web/src/subscription.rs @@ -2,7 +2,7 @@ use actix::{ Actor, ActorContext, ActorFuture, AsyncContext, ContextFutureSpawner, StreamHandler, WrapFuture, }; use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext}; -use async_graphql::http::websocket::WebSocketStream; +use async_graphql::http::WebSocketStream; use async_graphql::{resolver_utils::ObjectType, Data, FieldResult, Schema, SubscriptionType}; use futures::stream::SplitSink; use futures::{SinkExt, StreamExt}; @@ -67,13 +67,15 @@ where fn started(&mut self, ctx: &mut Self::Context) { self.hb(ctx); if let Some(initializer) = self.initializer.take() { - let (sink, stream) = - async_graphql::http::websocket::create_with_initializer(&self.schema, initializer) - .split(); + let (sink, stream) = async_graphql::http::WebSocketStream::new_with_initializer( + &self.schema, + initializer, + ) + .split(); ctx.add_stream(stream); self.sink = Some(sink); } else { - let (sink, stream) = async_graphql::http::websocket::create(&self.schema).split(); + let (sink, stream) = async_graphql::http::WebSocketStream::new(&self.schema).split(); ctx.add_stream(stream); self.sink = Some(sink); }; diff --git a/integrations/warp/src/lib.rs b/integrations/warp/src/lib.rs index eb5ee3f9..ef542dd8 100644 --- a/integrations/warp/src/lib.rs +++ b/integrations/warp/src/lib.rs @@ -199,11 +199,12 @@ where |ws: warp::ws::Ws, schema: Schema, initializer: F| { ws.on_upgrade(move |websocket| { let (mut tx, rx) = websocket.split(); - let (mut stx, srx) = async_graphql::http::websocket::create_with_initializer( - &schema, - initializer, - ) - .split(); + let (mut stx, srx) = + async_graphql::http::WebSocketStream::new_with_initializer( + &schema, + initializer, + ) + .split(); let mut rx = rx.fuse(); let srx = srx.fuse(); diff --git a/src/http/mod.rs b/src/http/mod.rs index 0f313a4f..7e4e4079 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -4,12 +4,13 @@ mod graphiql_source; #[cfg(feature = "multipart")] mod multipart; mod playground_source; -pub mod websocket; +mod websocket; pub use graphiql_source::graphiql_source; #[cfg(feature = "multipart")] pub use multipart::{receive_multipart, MultipartOptions}; pub use playground_source::{playground_source, GraphQLPlaygroundConfig}; +pub use websocket::WebSocketStream; use crate::{ParseRequestError, Request}; use futures::io::AsyncRead; diff --git a/src/http/websocket.rs b/src/http/websocket.rs index 6c370940..84f43139 100644 --- a/src/http/websocket.rs +++ b/src/http/websocket.rs @@ -65,43 +65,45 @@ impl Stream for WebSocketStream { } } -/// Create a websocket transport. -pub fn create( - schema: &Schema, -) -> WebSocketStream -where - Query: ObjectType + Send + Sync + 'static, - Mutation: ObjectType + Send + Sync + 'static, - Subscription: SubscriptionType + Send + Sync + 'static, -{ - create_with_initializer(schema, |_| Ok(Default::default())) -} +impl WebSocketStream { + /// Create a websocket transport. + pub fn new( + schema: &Schema, + ) -> Self + where + Query: ObjectType + Send + Sync + 'static, + Mutation: ObjectType + Send + Sync + 'static, + Subscription: SubscriptionType + Send + Sync + 'static, + { + Self::new_with_initializer(schema, |_| Ok(Default::default())) + } -/// Create a websocket transport and specify a context initialization function. -pub fn create_with_initializer( - schema: &Schema, - initializer: impl Fn(serde_json::Value) -> FieldResult + Send + Sync + 'static, -) -> WebSocketStream -where - Query: ObjectType + Send + Sync + 'static, - Mutation: ObjectType + Send + Sync + 'static, - Subscription: SubscriptionType + Send + Sync + 'static, -{ - let (tx, rx) = mpsc::unbounded(); - WebSocketStream { - tx, - rx: SubscriptionStream { - schema: schema.clone(), - initializer: Arc::new(initializer), - rx_bytes: rx, - handle_request_fut: None, - ctx: Some(WSContext { - streams: Default::default(), - send_buf: Default::default(), - ctx_data: Arc::new(Data::default()), - }), + /// Create a websocket transport and specify a context initialization function. + pub fn new_with_initializer( + schema: &Schema, + initializer: impl Fn(serde_json::Value) -> FieldResult + Send + Sync + 'static, + ) -> Self + where + Query: ObjectType + Send + Sync + 'static, + Mutation: ObjectType + Send + Sync + 'static, + Subscription: SubscriptionType + Send + Sync + 'static, + { + let (tx, rx) = mpsc::unbounded(); + WebSocketStream { + tx, + rx: SubscriptionStream { + schema: schema.clone(), + initializer: Arc::new(initializer), + rx_bytes: rx, + handle_request_fut: None, + ctx: Some(WSContext { + streams: Default::default(), + send_buf: Default::default(), + ctx_data: Arc::new(Data::default()), + }), + } + .boxed(), } - .boxed(), } } diff --git a/tests/subscription_websocket.rs b/tests/subscription_websocket.rs index e329126c..7fcf6df0 100644 --- a/tests/subscription_websocket.rs +++ b/tests/subscription_websocket.rs @@ -18,7 +18,7 @@ pub async fn test_subscription_ws_transport() { } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); - let mut stream = http::websocket::create(&schema); + let mut stream = http::WebSocketStream::new(&schema); stream .send( @@ -93,7 +93,7 @@ pub async fn test_subscription_ws_transport_with_token() { } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); - let mut stream = http::websocket::create_with_initializer(&schema, |value| { + let mut stream = http::WebSocketStream::new_with_initializer(&schema, |value| { #[derive(serde::Deserialize)] struct Payload { token: String, @@ -189,7 +189,7 @@ pub async fn test_subscription_ws_transport_error() { } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); - let mut stream = http::websocket::create(&schema); + let mut stream = http::WebSocketStream::new(&schema); stream .send( @@ -259,7 +259,7 @@ pub async fn test_query_over_websocket() { } let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); - let mut stream = http::websocket::create(&schema); + let mut stream = http::WebSocketStream::new(&schema); stream .send(