This commit is contained in:
sunli 2020-04-23 15:30:12 +08:00
parent e4981d41ab
commit 816cebf8c4
7 changed files with 106 additions and 17 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql"
version = "1.9.14"
version = "1.9.15"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "The GraphQL server library implemented by rust"
@ -18,7 +18,7 @@ default = ["bson", "uuid", "url", "chrono-tz", "validators"]
validators = ["regex"]
[dependencies]
async-graphql-derive = { path = "async-graphql-derive", version = "1.9.14" }
async-graphql-derive = { path = "async-graphql-derive", version = "1.9.15" }
graphql-parser = "=0.2.3"
anyhow = "1.0.26"
thiserror = "1.0.11"

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql-actix-web"
version = "1.1.2"
version = "1.1.3"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "async-graphql for actix-web"
@ -13,7 +13,7 @@ keywords = ["futures", "async", "graphql"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-graphql = { path = "..", version = "1.9.14" }
async-graphql = { path = "..", version = "1.9.15" }
actix-web = "2.0.0"
actix-web-actors = "2.0.0"
actix = "0.9.0"
@ -21,3 +21,4 @@ actix-rt = "1.0.0"
futures = "0.3.0"
mime = "0.3.16"
bytes = "0.5.4"
serde_json = "1.0.48"

View File

@ -2,7 +2,7 @@ use actix::{
Actor, ActorContext, ActorFuture, AsyncContext, ContextFutureSpawner, StreamHandler, WrapFuture,
};
use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext};
use async_graphql::{ObjectType, Schema, SubscriptionType, WebSocketTransport};
use async_graphql::{Data, ObjectType, Schema, SubscriptionType, WebSocketTransport};
use bytes::Bytes;
use futures::channel::mpsc;
use futures::SinkExt;
@ -16,6 +16,7 @@ pub struct WSSubscription<Query, Mutation, Subscription> {
schema: Schema<Query, Mutation, Subscription>,
hb: Instant,
sink: Option<mpsc::Sender<Bytes>>,
init_context_data: Option<Box<dyn Fn(serde_json::Value) -> Data + Send + Sync>>,
}
impl<Query, Mutation, Subscription> WSSubscription<Query, Mutation, Subscription>
@ -25,11 +26,26 @@ where
Subscription: SubscriptionType + Send + Sync + 'static,
{
/// Create an actor for subscription connection via websocket.
pub fn new(schema: &Schema<Query, Mutation, Subscription>) -> Self {
pub fn new<F>(schema: &Schema<Query, Mutation, Subscription>) -> Self
where
F: Fn(serde_json::Value) -> Data + Send + Sync + 'static,
{
Self {
schema: schema.clone(),
hb: Instant::now(),
sink: None,
init_context_data: None,
}
}
/// Set a context data initialization function.
pub fn init_context_data<F>(self, f: F) -> Self
where
F: Fn(serde_json::Value) -> Data + Send + Sync + 'static,
{
Self {
init_context_data: Some(Box::new(f)),
..self
}
}
@ -54,7 +70,13 @@ where
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
let schema = self.schema.clone();
let (sink, stream) = schema.subscription_connection(WebSocketTransport::default());
let (sink, stream) = schema.subscription_connection(
if let Some(init_with_payload) = self.init_context_data.take() {
WebSocketTransport::new(init_with_payload)
} else {
WebSocketTransport::default()
},
);
ctx.add_stream(stream);
self.sink = Some(sink);
}

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql-derive"
version = "1.9.14"
version = "1.9.15"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "Macros for async-graphql"

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql-warp"
version = "1.1.2"
version = "1.1.3"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "async-graphql for warp"
@ -13,10 +13,11 @@ keywords = ["futures", "async", "graphql"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-graphql = { path = "..", version = "1.9.14" }
async-graphql = { path = "..", version = "1.9.15" }
warp = "0.2.2"
futures = "0.3.0"
bytes = "0.5.4"
serde_json = "1.0.48"
[dev-dependencies]
tokio = { version = "0.2", features = ["macros"] }

View File

@ -6,8 +6,8 @@
use async_graphql::http::StreamBody;
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, Schema, SubscriptionType,
WebSocketTransport,
Data, IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, Schema,
SubscriptionType, WebSocketTransport,
};
use bytes::Bytes;
use futures::select;
@ -207,3 +207,68 @@ where
})
.boxed()
}
/// GraphQL subscription filter
///
/// Specifies that a function converts the init payload to data.
pub fn graphql_subscription_with_data<Query, Mutation, Subscription, F>(
schema: Schema<Query, Mutation, Subscription>,
init_context_data: F,
) -> BoxedFilter<(impl Reply,)>
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
F: Fn(serde_json::Value) -> Data + Send + Sync + Clone + 'static,
{
warp::any()
.and(warp::ws())
.and(warp::any().map(move || schema.clone()))
.and(warp::any().map(move || init_context_data.clone()))
.map(
|ws: warp::ws::Ws, schema: Schema<Query, Mutation, Subscription>, init_context_data: F| {
ws.on_upgrade(move |websocket| {
let (mut tx, rx) = websocket.split();
let (mut stx, srx) =
schema.subscription_connection(WebSocketTransport::new(init_context_data));
let mut rx = rx.fuse();
let mut srx = srx.fuse();
async move {
loop {
select! {
bytes = srx.next() => {
if let Some(bytes) = bytes {
if tx
.send(Message::text(unsafe {
String::from_utf8_unchecked(bytes.to_vec())
}))
.await
.is_err()
{
return;
}
} else {
return;
}
}
msg = rx.next() => {
if let Some(Ok(msg)) = msg {
if msg.is_text() {
if stx.send(Bytes::copy_from_slice(msg.as_bytes())).await.is_err() {
return;
}
}
}
}
}
}
}
})
},
).map(|reply| {
warp::reply::with_header(reply, "Sec-WebSocket-Protocol", "graphql-ws")
})
.boxed()
}

View File

@ -26,16 +26,16 @@ pub struct WebSocketTransport {
id_to_sid: HashMap<String, usize>,
sid_to_id: HashMap<usize, String>,
data: Arc<Data>,
init_with_payload: Option<Box<dyn Fn(serde_json::Value) -> Data + Send + Sync>>,
init_context_data: Option<Box<dyn Fn(serde_json::Value) -> Data + Send + Sync>>,
}
impl WebSocketTransport {
/// Creates a websocket transport and sets the function that converts the `payload` of the `connect_init` message to `Data`.
pub fn new<F: Fn(serde_json::Value) -> Data + Send + Sync + 'static>(
init_with_payload: F,
init_context_data: F,
) -> Self {
WebSocketTransport {
init_with_payload: Some(Box::new(init_with_payload)),
init_context_data: Some(Box::new(init_context_data)),
..WebSocketTransport::default()
}
}
@ -60,8 +60,8 @@ impl SubscriptionTransport for WebSocketTransport {
Ok(msg) => match msg.ty.as_str() {
"connection_init" => {
if let Some(payload) = msg.payload {
if let Some(init_with_payload) = &self.init_with_payload {
self.data = Arc::new(init_with_payload(payload));
if let Some(init_context_data) = &self.init_context_data {
self.data = Arc::new(init_context_data(payload));
}
}
Ok(Some(