Merge pull request #530 from BratSinot/ws_stream_to_generic

WebSocket is now generic in graphql_subscription_upgrade functions.
This commit is contained in:
Sunli 2021-05-31 20:47:11 +08:00 committed by GitHub
commit a06d418b8e

View File

@ -5,7 +5,6 @@ use async_graphql::http::{WebSocketProtocols, WsMessage};
use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType};
use futures_util::{future, StreamExt};
use warp::filters::ws;
use warp::ws::WebSocket;
use warp::{Filter, Rejection, Reply};
/// GraphQL subscription filter
@ -165,14 +164,16 @@ pub fn graphql_protocol() -> impl Filter<Extract = (WebSocketProtocols,), Error
/// warp::serve(filter).run(([0, 0, 0, 0], 8000)).await;
/// });
/// ```
pub async fn graphql_subscription_upgrade<Query, Mutation, Subscription>(
websocket: WebSocket,
pub async fn graphql_subscription_upgrade<Query, Mutation, Subscription, S>(
websocket: S,
protocol: WebSocketProtocols,
schema: Schema<Query, Mutation, Subscription>,
) where
Query: ObjectType + 'static,
Mutation: ObjectType + 'static,
Subscription: SubscriptionType + 'static,
S: futures_util::Stream<Item = Result<warp::ws::Message, warp::Error>>
+ futures_util::Sink<warp::ws::Message>,
{
graphql_subscription_upgrade_with_data(websocket, protocol, schema, |_| async {
Ok(Default::default())
@ -183,8 +184,8 @@ pub async fn graphql_subscription_upgrade<Query, Mutation, Subscription>(
/// Handle the WebSocket subscription.
///
/// Specifies that a function converts the init payload to data.
pub async fn graphql_subscription_upgrade_with_data<Query, Mutation, Subscription, F, R>(
websocket: WebSocket,
pub async fn graphql_subscription_upgrade_with_data<Query, Mutation, Subscription, F, R, S>(
websocket: S,
protocol: WebSocketProtocols,
schema: Schema<Query, Mutation, Subscription>,
initializer: F,
@ -192,8 +193,10 @@ pub async fn graphql_subscription_upgrade_with_data<Query, Mutation, Subscriptio
Query: ObjectType + 'static,
Mutation: ObjectType + 'static,
Subscription: SubscriptionType + 'static,
F: FnOnce(serde_json::Value) -> R + Clone + Send + 'static,
F: FnOnce(serde_json::Value) -> R + Send + 'static,
R: Future<Output = Result<Data>> + Send + 'static,
S: futures_util::Stream<Item = Result<warp::ws::Message, warp::Error>>
+ futures_util::Sink<warp::ws::Message>,
{
let (ws_sender, ws_receiver) = websocket.split();
let _ = async_graphql::http::WebSocket::with_data(