This commit is contained in:
sunli 2020-04-23 18:11:03 +08:00
parent fcd7cb7bab
commit d501f73a08
9 changed files with 29 additions and 24 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql"
version = "1.9.16"
version = "1.9.17"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "The GraphQL server library implemented by rust"
@ -18,7 +18,7 @@ default = ["bson", "uuid", "url", "chrono-tz", "validators"]
validators = ["regex"]
[dependencies]
async-graphql-derive = { path = "async-graphql-derive", version = "1.9.16" }
async-graphql-derive = { path = "async-graphql-derive", version = "1.9.17" }
graphql-parser = "=0.2.3"
anyhow = "1.0.26"
thiserror = "1.0.11"

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql-actix-web"
version = "1.1.5"
version = "1.1.6"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "async-graphql for actix-web"
@ -13,7 +13,7 @@ keywords = ["futures", "async", "graphql"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-graphql = { path = "..", version = "1.9.16" }
async-graphql = { path = "..", version = "1.9.17" }
actix-web = "2.0.0"
actix-web-actors = "2.0.0"
actix = "0.9.0"

View File

@ -2,7 +2,7 @@ use actix::{
Actor, ActorContext, ActorFuture, AsyncContext, ContextFutureSpawner, StreamHandler, WrapFuture,
};
use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext};
use async_graphql::{Data, ObjectType, Schema, SubscriptionType, WebSocketTransport};
use async_graphql::{Data, FieldResult, ObjectType, Schema, SubscriptionType, WebSocketTransport};
use bytes::Bytes;
use futures::channel::mpsc;
use futures::SinkExt;
@ -16,7 +16,7 @@ pub struct WSSubscription<Query, Mutation, Subscription> {
schema: Schema<Query, Mutation, Subscription>,
hb: Instant,
sink: Option<mpsc::Sender<Bytes>>,
init_context_data: Option<Box<dyn Fn(serde_json::Value) -> Data + Send + Sync>>,
init_context_data: Option<Box<dyn Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync>>,
}
impl<Query, Mutation, Subscription> WSSubscription<Query, Mutation, Subscription>
@ -38,7 +38,7 @@ where
/// Set a context data initialization function.
pub fn init_context_data<F>(self, f: F) -> Self
where
F: Fn(serde_json::Value) -> Data + Send + Sync + 'static,
F: Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync + 'static,
{
Self {
init_context_data: Some(Box::new(f)),

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql-derive"
version = "1.9.16"
version = "1.9.17"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "Macros for async-graphql"

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql-warp"
version = "1.1.4"
version = "1.1.5"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "async-graphql for warp"
@ -13,7 +13,7 @@ keywords = ["futures", "async", "graphql"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-graphql = { path = "..", version = "1.9.16" }
async-graphql = { path = "..", version = "1.9.17" }
warp = "0.2.2"
futures = "0.3.0"
bytes = "0.5.4"

View File

@ -6,7 +6,7 @@
use async_graphql::http::StreamBody;
use async_graphql::{
Data, IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, Schema,
Data, FieldResult, IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, Schema,
SubscriptionType, WebSocketTransport,
};
use bytes::Bytes;
@ -219,7 +219,7 @@ where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
F: Fn(serde_json::Value) -> Data + Send + Sync + Clone + 'static,
F: Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync + Clone + 'static,
{
warp::any()
.and(warp::ws())

View File

@ -1,7 +1,7 @@
use crate::{ObjectType, Schema, SubscriptionType};
use bytes::Bytes;
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use futures::task::{AtomicWaker, Context, Poll};
use futures::Stream;
use slab::Slab;
use std::future::Future;
@ -75,6 +75,7 @@ where
},
rx_bytes,
handle_request_fut: None,
waker: AtomicWaker::new(),
},
)
}
@ -95,6 +96,7 @@ pub struct SubscriptionStream<Query, Mutation, Subscription, T: SubscriptionTran
streams: SubscriptionStreams,
rx_bytes: mpsc::Receiver<Bytes>,
handle_request_fut: Option<HandleRequestBoxFut<T>>,
waker: AtomicWaker,
}
impl<Query, Mutation, Subscription, T> Stream
@ -138,6 +140,7 @@ where
data,
)));
}
this.waker.wake();
continue;
}
Poll::Ready(None) => return Poll::Ready(None),
@ -169,12 +172,14 @@ where
if num_closed == this.streams.streams.len() {
// all closed
return Poll::Ready(None);
this.waker.register(cx.waker());
return Poll::Pending;
} else if num_pending == this.streams.streams.len() {
return Poll::Pending;
}
}
} else {
this.waker.register(cx.waker());
return Poll::Pending;
}
}

View File

@ -1,8 +1,8 @@
use crate::context::Data;
use crate::http::{GQLError, GQLRequest, GQLResponse};
use crate::{
ObjectType, QueryResponse, Schema, SubscriptionStreams, SubscriptionTransport,
SubscriptionType, Variables,
FieldError, FieldResult, ObjectType, QueryResponse, Schema, SubscriptionStreams,
SubscriptionTransport, SubscriptionType, Variables,
};
use bytes::Bytes;
use std::collections::HashMap;
@ -26,12 +26,12 @@ pub struct WebSocketTransport {
id_to_sid: HashMap<String, usize>,
sid_to_id: HashMap<usize, String>,
data: Arc<Data>,
init_context_data: Option<Box<dyn Fn(serde_json::Value) -> Data + Send + Sync>>,
init_context_data: Option<Box<dyn Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync>>,
}
impl WebSocketTransport {
/// Creates a websocket transport and sets the function that converts the `payload` of the `connect_init` message to `Data`.
pub fn new<F: Fn(serde_json::Value) -> Data + Send + Sync + 'static>(
pub fn new<F: Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync + 'static>(
init_context_data: F,
) -> Self {
WebSocketTransport {
@ -43,7 +43,7 @@ impl WebSocketTransport {
#[async_trait::async_trait]
impl SubscriptionTransport for WebSocketTransport {
type Error = String;
type Error = FieldError;
async fn handle_request<Query, Mutation, Subscription>(
&mut self,
@ -61,7 +61,7 @@ impl SubscriptionTransport for WebSocketTransport {
"connection_init" => {
if let Some(payload) = msg.payload {
if let Some(init_context_data) = &self.init_context_data {
self.data = Arc::new(init_context_data(payload));
self.data = Arc::new(init_context_data(payload)?);
}
}
Ok(Some(
@ -125,10 +125,10 @@ impl SubscriptionTransport for WebSocketTransport {
}
Ok(None)
}
"connection_terminate" => Err("connection_terminate".to_string()),
_ => Err("Unknown op".to_string()),
"connection_terminate" => Err("connection_terminate".into()),
_ => Err("Unknown op".into()),
},
Err(err) => Err(err.to_string()),
Err(err) => Err(err.into()),
}
}

View File

@ -355,7 +355,7 @@ pub async fn test_subscription_ws_transport_with_token() {
let payload: Payload = serde_json::from_value(value).unwrap();
let mut data = Data::default();
data.insert(Token(payload.token));
data
Ok(data)
}));
sink.send(