Move http::websocket::create and http::websocket::create_with_initializer function to http::WebSocketStream.
This commit is contained in:
parent
9514432989
commit
492bba1cdd
|
@ -2,7 +2,7 @@ use actix::{
|
|||
Actor, ActorContext, ActorFuture, AsyncContext, ContextFutureSpawner, StreamHandler, WrapFuture,
|
||||
};
|
||||
use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext};
|
||||
use async_graphql::http::websocket::WebSocketStream;
|
||||
use async_graphql::http::WebSocketStream;
|
||||
use async_graphql::{resolver_utils::ObjectType, Data, FieldResult, Schema, SubscriptionType};
|
||||
use futures::stream::SplitSink;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
|
@ -67,13 +67,15 @@ where
|
|||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
self.hb(ctx);
|
||||
if let Some(initializer) = self.initializer.take() {
|
||||
let (sink, stream) =
|
||||
async_graphql::http::websocket::create_with_initializer(&self.schema, initializer)
|
||||
.split();
|
||||
let (sink, stream) = async_graphql::http::WebSocketStream::new_with_initializer(
|
||||
&self.schema,
|
||||
initializer,
|
||||
)
|
||||
.split();
|
||||
ctx.add_stream(stream);
|
||||
self.sink = Some(sink);
|
||||
} else {
|
||||
let (sink, stream) = async_graphql::http::websocket::create(&self.schema).split();
|
||||
let (sink, stream) = async_graphql::http::WebSocketStream::new(&self.schema).split();
|
||||
ctx.add_stream(stream);
|
||||
self.sink = Some(sink);
|
||||
};
|
||||
|
|
|
@ -199,11 +199,12 @@ where
|
|||
|ws: warp::ws::Ws, schema: Schema<Query, Mutation, Subscription>, initializer: F| {
|
||||
ws.on_upgrade(move |websocket| {
|
||||
let (mut tx, rx) = websocket.split();
|
||||
let (mut stx, srx) = async_graphql::http::websocket::create_with_initializer(
|
||||
&schema,
|
||||
initializer,
|
||||
)
|
||||
.split();
|
||||
let (mut stx, srx) =
|
||||
async_graphql::http::WebSocketStream::new_with_initializer(
|
||||
&schema,
|
||||
initializer,
|
||||
)
|
||||
.split();
|
||||
|
||||
let mut rx = rx.fuse();
|
||||
let srx = srx.fuse();
|
||||
|
|
|
@ -4,12 +4,13 @@ mod graphiql_source;
|
|||
#[cfg(feature = "multipart")]
|
||||
mod multipart;
|
||||
mod playground_source;
|
||||
pub mod websocket;
|
||||
mod websocket;
|
||||
|
||||
pub use graphiql_source::graphiql_source;
|
||||
#[cfg(feature = "multipart")]
|
||||
pub use multipart::{receive_multipart, MultipartOptions};
|
||||
pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
|
||||
pub use websocket::WebSocketStream;
|
||||
|
||||
use crate::{ParseRequestError, Request};
|
||||
use futures::io::AsyncRead;
|
||||
|
|
|
@ -65,43 +65,45 @@ impl Stream for WebSocketStream {
|
|||
}
|
||||
}
|
||||
|
||||
/// Create a websocket transport.
|
||||
pub fn create<Query, Mutation, Subscription>(
|
||||
schema: &Schema<Query, Mutation, Subscription>,
|
||||
) -> WebSocketStream
|
||||
where
|
||||
Query: ObjectType + Send + Sync + 'static,
|
||||
Mutation: ObjectType + Send + Sync + 'static,
|
||||
Subscription: SubscriptionType + Send + Sync + 'static,
|
||||
{
|
||||
create_with_initializer(schema, |_| Ok(Default::default()))
|
||||
}
|
||||
impl WebSocketStream {
|
||||
/// Create a websocket transport.
|
||||
pub fn new<Query, Mutation, Subscription>(
|
||||
schema: &Schema<Query, Mutation, Subscription>,
|
||||
) -> Self
|
||||
where
|
||||
Query: ObjectType + Send + Sync + 'static,
|
||||
Mutation: ObjectType + Send + Sync + 'static,
|
||||
Subscription: SubscriptionType + Send + Sync + 'static,
|
||||
{
|
||||
Self::new_with_initializer(schema, |_| Ok(Default::default()))
|
||||
}
|
||||
|
||||
/// Create a websocket transport and specify a context initialization function.
|
||||
pub fn create_with_initializer<Query, Mutation, Subscription>(
|
||||
schema: &Schema<Query, Mutation, Subscription>,
|
||||
initializer: impl Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync + 'static,
|
||||
) -> WebSocketStream
|
||||
where
|
||||
Query: ObjectType + Send + Sync + 'static,
|
||||
Mutation: ObjectType + Send + Sync + 'static,
|
||||
Subscription: SubscriptionType + Send + Sync + 'static,
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
WebSocketStream {
|
||||
tx,
|
||||
rx: SubscriptionStream {
|
||||
schema: schema.clone(),
|
||||
initializer: Arc::new(initializer),
|
||||
rx_bytes: rx,
|
||||
handle_request_fut: None,
|
||||
ctx: Some(WSContext {
|
||||
streams: Default::default(),
|
||||
send_buf: Default::default(),
|
||||
ctx_data: Arc::new(Data::default()),
|
||||
}),
|
||||
/// Create a websocket transport and specify a context initialization function.
|
||||
pub fn new_with_initializer<Query, Mutation, Subscription>(
|
||||
schema: &Schema<Query, Mutation, Subscription>,
|
||||
initializer: impl Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync + 'static,
|
||||
) -> Self
|
||||
where
|
||||
Query: ObjectType + Send + Sync + 'static,
|
||||
Mutation: ObjectType + Send + Sync + 'static,
|
||||
Subscription: SubscriptionType + Send + Sync + 'static,
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
WebSocketStream {
|
||||
tx,
|
||||
rx: SubscriptionStream {
|
||||
schema: schema.clone(),
|
||||
initializer: Arc::new(initializer),
|
||||
rx_bytes: rx,
|
||||
handle_request_fut: None,
|
||||
ctx: Some(WSContext {
|
||||
streams: Default::default(),
|
||||
send_buf: Default::default(),
|
||||
ctx_data: Arc::new(Data::default()),
|
||||
}),
|
||||
}
|
||||
.boxed(),
|
||||
}
|
||||
.boxed(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ pub async fn test_subscription_ws_transport() {
|
|||
}
|
||||
|
||||
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
||||
let mut stream = http::websocket::create(&schema);
|
||||
let mut stream = http::WebSocketStream::new(&schema);
|
||||
|
||||
stream
|
||||
.send(
|
||||
|
@ -93,7 +93,7 @@ pub async fn test_subscription_ws_transport_with_token() {
|
|||
}
|
||||
|
||||
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
||||
let mut stream = http::websocket::create_with_initializer(&schema, |value| {
|
||||
let mut stream = http::WebSocketStream::new_with_initializer(&schema, |value| {
|
||||
#[derive(serde::Deserialize)]
|
||||
struct Payload {
|
||||
token: String,
|
||||
|
@ -189,7 +189,7 @@ pub async fn test_subscription_ws_transport_error() {
|
|||
}
|
||||
|
||||
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
||||
let mut stream = http::websocket::create(&schema);
|
||||
let mut stream = http::WebSocketStream::new(&schema);
|
||||
|
||||
stream
|
||||
.send(
|
||||
|
@ -259,7 +259,7 @@ pub async fn test_query_over_websocket() {
|
|||
}
|
||||
|
||||
let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
|
||||
let mut stream = http::websocket::create(&schema);
|
||||
let mut stream = http::WebSocketStream::new(&schema);
|
||||
|
||||
stream
|
||||
.send(
|
||||
|
|
Loading…
Reference in New Issue
Block a user