Improve subscriptions performance for warp.
This commit is contained in:
parent
4e3bb4aa40
commit
ff43bd8a21
|
@ -51,6 +51,7 @@ bson = { version = "1.0.0", optional = true }
|
|||
uuid = { version = "0.8.1", features = ["v4", "serde"] }
|
||||
url = { version = "2.1.1", optional = true }
|
||||
chrono-tz = { version = "0.5.1", optional = true }
|
||||
smallvec = "1.4.2"
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = { version = "1.5.0", features = ["attributes"] }
|
||||
|
|
|
@ -10,7 +10,6 @@ use async_graphql::{
|
|||
Data, FieldResult, IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder,
|
||||
QueryResponse, Schema, SubscriptionType, WebSocketTransport,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use futures::select;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use hyper::Method;
|
||||
|
@ -165,54 +164,7 @@ where
|
|||
Mutation: ObjectType + Sync + Send + 'static,
|
||||
Subscription: SubscriptionType + Send + Sync + 'static,
|
||||
{
|
||||
warp::any()
|
||||
.and(warp::ws())
|
||||
.and(warp::any().map(move || schema.clone()))
|
||||
.map(
|
||||
|ws: warp::ws::Ws, schema: Schema<Query, Mutation, Subscription>| {
|
||||
ws.on_upgrade(move |websocket| {
|
||||
let (mut tx, rx) = websocket.split();
|
||||
let (mut stx, srx) =
|
||||
schema.subscription_connection(WebSocketTransport::default());
|
||||
|
||||
let mut rx = rx.fuse();
|
||||
let mut srx = srx.fuse();
|
||||
|
||||
async move {
|
||||
loop {
|
||||
select! {
|
||||
bytes = srx.next() => {
|
||||
if let Some(bytes) = bytes {
|
||||
if let Ok(text) = String::from_utf8(bytes.to_vec()) {
|
||||
if tx.send(Message::text(text)).await.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
msg = rx.next() => {
|
||||
if let Some(Ok(msg)) = msg {
|
||||
if msg.is_text() {
|
||||
if stx.send(Bytes::copy_from_slice(msg.as_bytes())).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
).map(|reply| {
|
||||
warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")
|
||||
})
|
||||
.boxed()
|
||||
graphql_subscription_with_data(schema, |_| Ok(Default::default()))
|
||||
}
|
||||
|
||||
/// GraphQL subscription filter
|
||||
|
@ -233,7 +185,9 @@ where
|
|||
.and(warp::any().map(move || schema.clone()))
|
||||
.and(warp::any().map(move || init_context_data.clone()))
|
||||
.map(
|
||||
|ws: warp::ws::Ws, schema: Schema<Query, Mutation, Subscription>, init_context_data: F| {
|
||||
|ws: warp::ws::Ws,
|
||||
schema: Schema<Query, Mutation, Subscription>,
|
||||
init_context_data: F| {
|
||||
ws.on_upgrade(move |websocket| {
|
||||
let (mut tx, rx) = websocket.split();
|
||||
let (mut stx, srx) =
|
||||
|
@ -259,7 +213,7 @@ where
|
|||
msg = rx.next() => {
|
||||
if let Some(Ok(msg)) = msg {
|
||||
if msg.is_text() {
|
||||
if stx.send(Bytes::copy_from_slice(msg.as_bytes())).await.is_err() {
|
||||
if stx.send(msg.into_bytes().into()).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -272,9 +226,8 @@ where
|
|||
}
|
||||
})
|
||||
},
|
||||
).map(|reply| {
|
||||
warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")
|
||||
})
|
||||
)
|
||||
.map(|reply| warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws"))
|
||||
.boxed()
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ use futures::channel::mpsc;
|
|||
use futures::task::{AtomicWaker, Context, Poll};
|
||||
use futures::{Stream, StreamExt};
|
||||
use slab::Slab;
|
||||
use smallvec::SmallVec;
|
||||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
@ -45,7 +46,7 @@ pub trait ConnectionTransport: Send + Sync + Unpin + 'static {
|
|||
schema: &Schema<Query, Mutation, Subscription>,
|
||||
streams: &mut SubscriptionStreams,
|
||||
data: Bytes,
|
||||
) -> std::result::Result<Option<Vec<Bytes>>, Self::Error>
|
||||
) -> std::result::Result<Option<SmallVec<[Bytes; 4]>>, Self::Error>
|
||||
where
|
||||
Query: ObjectType + Sync + Send + 'static,
|
||||
Mutation: ObjectType + Sync + Send + 'static,
|
||||
|
@ -92,7 +93,10 @@ type HandleRequestBoxFut<'a, T> = Pin<
|
|||
Box<
|
||||
dyn Future<
|
||||
Output = (
|
||||
std::result::Result<Option<Vec<Bytes>>, <T as ConnectionTransport>::Error>,
|
||||
std::result::Result<
|
||||
Option<SmallVec<[Bytes; 4]>>,
|
||||
<T as ConnectionTransport>::Error,
|
||||
>,
|
||||
&'a mut T,
|
||||
&'a mut SubscriptionStreams,
|
||||
),
|
||||
|
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
QueryResponse, Result, Schema, SubscriptionStreams, SubscriptionType, Variables,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -50,7 +51,7 @@ impl ConnectionTransport for WebSocketTransport {
|
|||
schema: &Schema<Query, Mutation, Subscription>,
|
||||
streams: &mut SubscriptionStreams,
|
||||
data: Bytes,
|
||||
) -> std::result::Result<Option<Vec<Bytes>>, Self::Error>
|
||||
) -> std::result::Result<Option<SmallVec<[Bytes; 4]>>, Self::Error>
|
||||
where
|
||||
Query: ObjectType + Sync + Send + 'static,
|
||||
Mutation: ObjectType + Sync + Send + 'static,
|
||||
|
@ -64,7 +65,7 @@ impl ConnectionTransport for WebSocketTransport {
|
|||
self.data = Arc::new(init_context_data(payload)?);
|
||||
}
|
||||
}
|
||||
Ok(Some(vec![serde_json::to_vec(&OperationMessage {
|
||||
Ok(Some(smallvec![serde_json::to_vec(&OperationMessage {
|
||||
ty: "connection_ack".to_string(),
|
||||
id: None,
|
||||
payload: None,
|
||||
|
@ -106,7 +107,7 @@ impl ConnectionTransport for WebSocketTransport {
|
|||
}
|
||||
|
||||
match builder.execute(schema).await {
|
||||
Ok(resp) => Ok(Some(vec![
|
||||
Ok(resp) => Ok(Some(smallvec![
|
||||
serde_json::to_vec(&OperationMessage {
|
||||
ty: "data".to_string(),
|
||||
id: Some(id.clone()),
|
||||
|
@ -125,26 +126,30 @@ impl ConnectionTransport for WebSocketTransport {
|
|||
.unwrap()
|
||||
.into(),
|
||||
])),
|
||||
Err(err) => {
|
||||
Ok(Some(vec![serde_json::to_vec(&OperationMessage {
|
||||
Err(err) => Ok(Some(smallvec![serde_json::to_vec(
|
||||
&OperationMessage {
|
||||
ty: "error".to_string(),
|
||||
id: Some(id),
|
||||
payload: Some(
|
||||
serde_json::to_value(GQLError(&err)).unwrap(),
|
||||
),
|
||||
})
|
||||
.unwrap()
|
||||
.into()]))
|
||||
}
|
||||
}
|
||||
)
|
||||
.unwrap()
|
||||
.into()])),
|
||||
}
|
||||
}
|
||||
Err(err) => Ok(Some(vec![serde_json::to_vec(&OperationMessage {
|
||||
ty: "error".to_string(),
|
||||
id: Some(id),
|
||||
payload: Some(serde_json::to_value(GQLError(&err)).unwrap()),
|
||||
})
|
||||
.unwrap()
|
||||
.into()])),
|
||||
Err(err) => {
|
||||
Ok(Some(smallvec![serde_json::to_vec(&OperationMessage {
|
||||
ty: "error".to_string(),
|
||||
id: Some(id),
|
||||
payload: Some(
|
||||
serde_json::to_value(GQLError(&err)).unwrap()
|
||||
),
|
||||
})
|
||||
.unwrap()
|
||||
.into()]))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
|
@ -158,7 +163,7 @@ impl ConnectionTransport for WebSocketTransport {
|
|||
if let Some(sid) = self.id_to_sid.remove(&id) {
|
||||
self.sid_to_id.remove(&sid);
|
||||
streams.remove(sid);
|
||||
return Ok(Some(vec![serde_json::to_vec(&OperationMessage {
|
||||
return Ok(Some(smallvec![serde_json::to_vec(&OperationMessage {
|
||||
ty: "complete".to_string(),
|
||||
id: Some(id),
|
||||
payload: None,
|
||||
|
|
Loading…
Reference in New Issue
Block a user