Add support for graphql-ws pings. #635
This commit is contained in:
parent
49db61cec1
commit
024a143f11
|
@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
|
||||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## [Unreleased]
|
||||||
|
|
||||||
|
- Add support for `graphql-ws` pings. [#635](https://github.com/async-graphql/async-graphql/issues/635)
|
||||||
|
|
||||||
## [2.9.15] 2021-09-10
|
## [2.9.15] 2021-09-10
|
||||||
|
|
||||||
- Added Axum error handling. [#629](https://github.com/async-graphql/async-graphql/pull/629)
|
- Added Axum error handling. [#629](https://github.com/async-graphql/async-graphql/pull/629)
|
||||||
|
|
|
@ -64,6 +64,7 @@ pin_project! {
|
||||||
/// A GraphQL connection over websocket.
|
/// A GraphQL connection over websocket.
|
||||||
///
|
///
|
||||||
/// [Reference](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md).
|
/// [Reference](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md).
|
||||||
|
/// [Reference](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md).
|
||||||
pub struct WebSocket<S, F, Query, Mutation, Subscription> {
|
pub struct WebSocket<S, F, Query, Mutation, Subscription> {
|
||||||
data_initializer: Option<F>,
|
data_initializer: Option<F>,
|
||||||
init_fut: Option<BoxFuture<'static, Result<Data>>>,
|
init_fut: Option<BoxFuture<'static, Result<Data>>>,
|
||||||
|
@ -247,6 +248,15 @@ where
|
||||||
// `CONNECTION_TERMINATE` `client -> server` message; rather, disconnection is
|
// `CONNECTION_TERMINATE` `client -> server` message; rather, disconnection is
|
||||||
// handled by disconnecting the websocket
|
// handled by disconnecting the websocket
|
||||||
ClientMessage::ConnectionTerminate => return Poll::Ready(None),
|
ClientMessage::ConnectionTerminate => return Poll::Ready(None),
|
||||||
|
// Pong must be sent in response from the receiving party as soon as possible.
|
||||||
|
ClientMessage::Ping { .. } => {
|
||||||
|
return Poll::Ready(Some(WsMessage::Text(
|
||||||
|
serde_json::to_string(&ServerMessage::Pong { payload: None }).unwrap(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
ClientMessage::Pong { .. } => {
|
||||||
|
// Do nothing...
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -368,6 +378,20 @@ pub enum ClientMessage {
|
||||||
},
|
},
|
||||||
/// Connection terminated by the client
|
/// Connection terminated by the client
|
||||||
ConnectionTerminate,
|
ConnectionTerminate,
|
||||||
|
/// Useful for detecting failed connections, displaying latency metrics or other types of network probing.
|
||||||
|
///
|
||||||
|
/// https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md#ping
|
||||||
|
Ping {
|
||||||
|
/// Additional details about the ping.
|
||||||
|
payload: Option<serde_json::Value>,
|
||||||
|
},
|
||||||
|
/// The response to the Ping message.
|
||||||
|
///
|
||||||
|
/// https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md#pong
|
||||||
|
Pong {
|
||||||
|
/// Additional details about the pong.
|
||||||
|
payload: Option<serde_json::Value>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClientMessage {
|
impl ClientMessage {
|
||||||
|
@ -405,6 +429,13 @@ enum ServerMessage<'a> {
|
||||||
Complete {
|
Complete {
|
||||||
id: &'a str,
|
id: &'a str,
|
||||||
},
|
},
|
||||||
|
/// The response to the Ping message.
|
||||||
|
///
|
||||||
|
/// https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md#pong
|
||||||
|
Pong {
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
payload: Option<serde_json::Value>,
|
||||||
|
},
|
||||||
// Not used by this library
|
// Not used by this library
|
||||||
// #[serde(rename = "ka")]
|
// #[serde(rename = "ka")]
|
||||||
// KeepAlive
|
// KeepAlive
|
||||||
|
|
|
@ -602,3 +602,79 @@ pub async fn test_stream_drop() {
|
||||||
|
|
||||||
assert!(*dropped.lock().unwrap());
|
assert!(*dropped.lock().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn test_ping_pong() {
|
||||||
|
struct QueryRoot;
|
||||||
|
|
||||||
|
#[Object]
|
||||||
|
impl QueryRoot {
|
||||||
|
async fn value(&self) -> i32 {
|
||||||
|
10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SubscriptionRoot;
|
||||||
|
|
||||||
|
#[Subscription]
|
||||||
|
impl SubscriptionRoot {
|
||||||
|
async fn values(&self) -> impl Stream<Item = i32> {
|
||||||
|
futures_util::stream::iter(0..10)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
||||||
|
let (mut tx, rx) = mpsc::unbounded();
|
||||||
|
let mut stream = http::WebSocket::new(schema, rx, WebSocketProtocols::GraphQLWS);
|
||||||
|
|
||||||
|
tx.send(
|
||||||
|
serde_json::to_string(&value!({
|
||||||
|
"type": "connection_init",
|
||||||
|
}))
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
serde_json::from_str::<serde_json::Value>(&stream.next().await.unwrap().unwrap_text())
|
||||||
|
.unwrap(),
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "connection_ack",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
for _ in 0..5 {
|
||||||
|
tx.send(
|
||||||
|
serde_json::to_string(&value!({
|
||||||
|
"type": "ping",
|
||||||
|
}))
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
serde_json::from_str::<serde_json::Value>(&stream.next().await.unwrap().unwrap_text())
|
||||||
|
.unwrap(),
|
||||||
|
serde_json::json!({
|
||||||
|
"type": "pong",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.send(
|
||||||
|
serde_json::to_string(&value!({
|
||||||
|
"type": "pong",
|
||||||
|
}))
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
tokio::time::timeout(Duration::from_millis(100), stream.next())
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user