Add open / close WebSocket callbacks in warp subscription filter.

This commit is contained in:
BratSinot 2021-04-03 09:04:52 +03:00
parent 1f0bd225d6
commit 9578ed460a

View File

@ -67,12 +67,30 @@ pub fn graphql_subscription_with_data<Query, Mutation, Subscription, F, R>(
schema: Schema<Query, Mutation, Subscription>, schema: Schema<Query, Mutation, Subscription>,
initializer: F, initializer: F,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone ) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone
where
Query: ObjectType + 'static,
Mutation: ObjectType + 'static,
Subscription: SubscriptionType + 'static,
F: FnOnce(serde_json::Value) -> R + Clone + Send + 'static,
R: Future<Output = Result<Data>> + Send + 'static, { graphql_subscription_with_data_and_callbacks(schema, initializer, ||{}, ||{}) }
/// GraphQL subscription filter
///
/// Specifies that a function converts the init payload to data.
pub fn graphql_subscription_with_data_and_callbacks<Query, Mutation, Subscription, F, R, FO, FC>(
schema: Schema<Query, Mutation, Subscription>,
initializer: F,
open_callback: FO,
close_callback: FC,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone
where where
Query: ObjectType + 'static, Query: ObjectType + 'static,
Mutation: ObjectType + 'static, Mutation: ObjectType + 'static,
Subscription: SubscriptionType + 'static, Subscription: SubscriptionType + 'static,
F: FnOnce(serde_json::Value) -> R + Clone + Send + 'static, F: FnOnce(serde_json::Value) -> R + Clone + Send + 'static,
R: Future<Output = Result<Data>> + Send + 'static, R: Future<Output = Result<Data>> + Send + 'static,
FO: FnOnce() + Clone + Send + 'static,
FC: FnOnce() + Clone + Send + 'static,
{ {
use async_graphql::http::WebSocketProtocols; use async_graphql::http::WebSocketProtocols;
use std::str::FromStr; use std::str::FromStr;
@ -82,6 +100,8 @@ where
.map(move |ws: ws::Ws, protocols: Option<String>| { .map(move |ws: ws::Ws, protocols: Option<String>| {
let schema = schema.clone(); let schema = schema.clone();
let initializer = initializer.clone(); let initializer = initializer.clone();
let open_callback = open_callback.clone();
let close_callback = close_callback.clone();
let protocol = protocols let protocol = protocols
.and_then(|protocols| { .and_then(|protocols| {
@ -94,6 +114,8 @@ where
let reply = ws.on_upgrade(move |websocket| { let reply = ws.on_upgrade(move |websocket| {
let (ws_sender, ws_receiver) = websocket.split(); let (ws_sender, ws_receiver) = websocket.split();
open_callback();
async move { async move {
let _ = async_graphql::http::WebSocket::with_data( let _ = async_graphql::http::WebSocket::with_data(
schema, schema,
@ -112,6 +134,8 @@ where
.map(Ok) .map(Ok)
.forward(ws_sender) .forward(ws_sender)
.await; .await;
close_callback();
} }
}); });