Re-enable the test on websocket.

This commit is contained in:
Sunli 2020-09-11 16:41:56 +08:00
parent 3e01ef0be7
commit a93b91ae2d
3 changed files with 333 additions and 297 deletions

View File

@ -453,8 +453,11 @@ where
resp resp
} }
/// Execute an GraphQL subscription. pub(crate) fn execute_stream_with_ctx_data(
pub fn execute_stream(&self, request: impl Into<Request>) -> impl Stream<Item = Response> { &self,
request: impl Into<Request>,
ctx_data: Arc<Data>,
) -> impl Stream<Item = Response> {
let schema = self.clone(); let schema = self.clone();
async_stream::stream! { async_stream::stream! {
let request = request.into(); let request = request.into();
@ -480,7 +483,7 @@ where
extensions, extensions,
request.variables, request.variables,
document, document,
Arc::new(request.ctx_data), ctx_data,
); );
let ctx = env.create_context( let ctx = env.create_context(
@ -507,4 +510,11 @@ where
} }
} }
} }
/// Execute an GraphQL subscription.
pub fn execute_stream(&self, request: impl Into<Request>) -> impl Stream<Item = Response> {
let mut request = request.into();
let ctx_data = std::mem::replace(&mut request.ctx_data, Default::default());
self.execute_stream_with_ctx_data(request, Arc::new(ctx_data))
}
} }

View File

@ -1,6 +1,6 @@
//! WebSocket transport for subscription //! WebSocket transport for subscription
use crate::{http, Data, FieldResult, ObjectType, Response, Schema, SubscriptionType}; use crate::{http, Data, FieldResult, ObjectType, Request, Response, Schema, SubscriptionType};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::task::{Context, Poll}; use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt}; use futures::{Future, Stream, StreamExt};
@ -181,8 +181,11 @@ where
payload: None, payload: None,
}, },
); );
break;
}
Poll::Pending => {
break;
} }
Poll::Pending => break,
} }
} }
} }
@ -191,6 +194,10 @@ where
ctx.streams.remove(&id); ctx.streams.remove(&id);
} }
} }
if !ctx.send_buf.is_empty() {
continue;
}
} }
return Poll::Pending; return Poll::Pending;
@ -227,7 +234,10 @@ where
"start" => { "start" => {
if let (Some(id), Some(payload)) = (msg.id, msg.payload) { if let (Some(id), Some(payload)) = (msg.id, msg.payload) {
if let Ok(request) = serde_json::from_value::<http::GQLRequest>(payload) { if let Ok(request) = serde_json::from_value::<http::GQLRequest>(payload) {
let stream = schema.execute_stream(request).boxed(); let request = Request::from(request);
let stream = schema
.execute_stream_with_ctx_data(request, ctx.ctx_data.clone())
.boxed();
ctx.streams.insert(id, stream); ctx.streams.insert(id, stream);
} }
} }

View File

@ -1,291 +1,307 @@
// use async_graphql::*; use async_graphql::*;
// use futures::{SinkExt, Stream, StreamExt}; use futures::{SinkExt, Stream, StreamExt};
//
// #[async_std::test] #[async_std::test]
// pub async fn test_subscription_ws_transport() { pub async fn test_subscription_ws_transport() {
// struct QueryRoot; struct QueryRoot;
//
// #[Object] #[Object]
// impl QueryRoot {} impl QueryRoot {}
//
// struct SubscriptionRoot; struct SubscriptionRoot;
//
// #[Subscription] #[Subscription]
// impl SubscriptionRoot { impl SubscriptionRoot {
// async fn values(&self) -> impl Stream<Item = i32> { async fn values(&self) -> impl Stream<Item = i32> {
// futures::stream::iter(0..10) futures::stream::iter(0..10)
// } }
// } }
//
// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
// let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default()); let (mut sink, stream) = transports::websocket::create(&schema);
// futures::pin_mut!(stream);
// sink.send(
// serde_json::to_vec(&serde_json::json!({ sink.send(
// "type": "connection_init", serde_json::to_vec(&serde_json::json!({
// "payload": { "token": "123456" } "type": "connection_init",
// })) }))
// .unwrap(), .unwrap(),
// ) )
// .await .await
// .unwrap(); .unwrap();
//
// assert_eq!( assert_eq!(
// Some(serde_json::json!({ Some(serde_json::json!({
// "type": "connection_ack", "type": "connection_ack",
// })), })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// ); );
//
// sink.send( sink.send(
// serde_json::to_vec(&serde_json::json!({ serde_json::to_vec(&serde_json::json!({
// "type": "start", "type": "start",
// "id": "1", "id": "1",
// "payload": { "payload": {
// "query": "subscription { values }" "query": "subscription { values }"
// }, },
// })) }))
// .unwrap(), .unwrap(),
// ) )
// .await .await
// .unwrap(); .unwrap();
//
// for i in 0..10 { for i in 0..10 {
// assert_eq!( assert_eq!(
// Some(serde_json::json!({ Some(serde_json::json!({
// "type": "data", "type": "data",
// "id": "1", "id": "1",
// "payload": { "data": { "values": i } }, "payload": { "data": { "values": i } },
// })), })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// ); );
// } }
// }
// assert_eq!(
// #[async_std::test] Some(serde_json::json!({
// pub async fn test_subscription_ws_transport_with_token() { "type": "complete",
// struct Token(String); "id": "1",
// })),
// struct QueryRoot; serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
// #[Object] }
// impl QueryRoot {}
// #[async_std::test]
// struct SubscriptionRoot; pub async fn test_subscription_ws_transport_with_token() {
// struct Token(String);
// #[Subscription]
// impl SubscriptionRoot { struct QueryRoot;
// async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> {
// if ctx.data_unchecked::<Token>().0 != "123456" { #[Object]
// return Err("forbidden".into()); impl QueryRoot {}
// }
// Ok(futures::stream::iter(0..10)) struct SubscriptionRoot;
// }
// } #[Subscription]
// impl SubscriptionRoot {
// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> {
// if ctx.data_unchecked::<Token>().0 != "123456" {
// let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::new(|value| { return Err("forbidden".into());
// #[derive(serde::Deserialize)] }
// struct Payload { Ok(futures::stream::iter(0..10))
// token: String, }
// } }
//
// let payload: Payload = serde_json::from_value(value).unwrap(); let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
// let mut data = Data::default(); let (mut sink, stream) = transports::websocket::create_with_initializer(&schema, |value| {
// data.insert(Token(payload.token)); #[derive(serde::Deserialize)]
// Ok(data) struct Payload {
// })); token: String,
// }
// sink.send(
// serde_json::to_vec(&serde_json::json!({ let payload: Payload = serde_json::from_value(value).unwrap();
// "type": "connection_init", let mut data = Data::default();
// "payload": { "token": "123456" } data.insert(Token(payload.token));
// })) Ok(data)
// .unwrap(), });
// ) futures::pin_mut!(stream);
// .await
// .unwrap(); sink.send(
// serde_json::to_vec(&serde_json::json!({
// assert_eq!( "type": "connection_init",
// Some(serde_json::json!({ "payload": { "token": "123456" }
// "type": "connection_ack", }))
// })), .unwrap(),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() )
// ); .await
// .unwrap();
// sink.send(
// serde_json::to_vec(&serde_json::json!({ assert_eq!(
// "type": "start", Some(serde_json::json!({
// "id": "1", "type": "connection_ack",
// "payload": { })),
// "query": "subscription { values }" serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// }, );
// }))
// .unwrap(), sink.send(
// ) serde_json::to_vec(&serde_json::json!({
// .await "type": "start",
// .unwrap(); "id": "1",
// "payload": {
// for i in 0..10 { "query": "subscription { values }"
// assert_eq!( },
// Some(serde_json::json!({ }))
// "type": "data", .unwrap(),
// "id": "1", )
// "payload": { "data": { "values": i } }, .await
// })), .unwrap();
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// ); for i in 0..10 {
// } assert_eq!(
// } Some(serde_json::json!({
// "type": "data",
// #[async_std::test] "id": "1",
// pub async fn test_subscription_ws_transport_error() { "payload": { "data": { "values": i } },
// struct QueryRoot; })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// struct Event { );
// value: i32, }
// }
// assert_eq!(
// #[Object] Some(serde_json::json!({
// impl Event { "type": "complete",
// async fn value(&self) -> FieldResult<i32> { "id": "1",
// if self.value < 5 { })),
// Ok(self.value) serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// } else { );
// Err("TestError".into()) }
// }
// } #[async_std::test]
// } pub async fn test_subscription_ws_transport_error() {
// struct QueryRoot;
// #[Object]
// impl QueryRoot {} struct Event {
// value: i32,
// struct SubscriptionRoot; }
//
// #[Subscription] #[Object]
// impl SubscriptionRoot { impl Event {
// async fn events(&self) -> impl Stream<Item = Event> { async fn value(&self) -> FieldResult<i32> {
// futures::stream::iter((0..10).map(|n| Event { value: n })) if self.value < 5 {
// } Ok(self.value)
// } } else {
// Err("TestError".into())
// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); }
// }
// let (mut sink, mut stream) = }
// schema.subscription_connection(WebSocketTransport::new(|_| Ok(Data::default())));
// #[Object]
// sink.send( impl QueryRoot {}
// serde_json::to_vec(&serde_json::json!({
// "type": "connection_init" struct SubscriptionRoot;
// }))
// .unwrap(), #[Subscription]
// ) impl SubscriptionRoot {
// .await async fn events(&self) -> impl Stream<Item = Event> {
// .unwrap(); futures::stream::iter((0..10).map(|n| Event { value: n }))
// }
// assert_eq!( }
// Some(serde_json::json!({
// "type": "connection_ack", let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
// })), let (mut sink, stream) = transports::websocket::create(&schema);
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() futures::pin_mut!(stream);
// );
// sink.send(
// sink.send( serde_json::to_vec(&serde_json::json!({
// serde_json::to_vec(&serde_json::json!({ "type": "connection_init"
// "type": "start", }))
// "id": "1", .unwrap(),
// "payload": { )
// "query": "subscription { events { value } }" .await
// }, .unwrap();
// }))
// .unwrap(), assert_eq!(
// ) Some(serde_json::json!({
// .await "type": "connection_ack",
// .unwrap(); })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// for i in 0i32..5 { );
// assert_eq!(
// Some(serde_json::json!({ sink.send(
// "type": "data", serde_json::to_vec(&serde_json::json!({
// "id": "1", "type": "start",
// "payload": { "data": { "events": { "value": i } } }, "id": "1",
// })), "payload": {
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() "query": "subscription { events { value } }"
// ); },
// } }))
// .unwrap(),
// assert_eq!( )
// Some(serde_json::json!({ .await
// "type": "error", .unwrap();
// "id": "1",
// "payload": [{ for i in 0i32..5 {
// "message": "TestError", assert_eq!(
// "locations": [{"line": 1, "column": 25}], Some(serde_json::json!({
// "path": ["events", "value"], "type": "data",
// }], "id": "1",
// })), "payload": { "data": { "events": { "value": i } } },
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() })),
// ); serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// } );
// }
// #[async_std::test]
// pub async fn test_query_over_websocket() { assert_eq!(
// struct QueryRoot; Some(serde_json::json!({
// "type": "error",
// #[Object] "id": "1",
// impl QueryRoot { "payload": [{
// async fn value(&self) -> i32 { "message": "TestError",
// 999 "locations": [{"line": 1, "column": 25}],
// } "path": ["events", "value"],
// } }],
// })),
// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default()); );
// }
// sink.send(
// serde_json::to_vec(&serde_json::json!({ #[async_std::test]
// "type": "connection_init", pub async fn test_query_over_websocket() {
// })) struct QueryRoot;
// .unwrap(),
// ) #[Object]
// .await impl QueryRoot {
// .unwrap(); async fn value(&self) -> i32 {
// 999
// assert_eq!( }
// Some(serde_json::json!({ }
// "type": "connection_ack",
// })), let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() let (mut sink, stream) = transports::websocket::create(&schema);
// ); futures::pin_mut!(stream);
//
// sink.send( sink.send(
// serde_json::to_vec(&serde_json::json!({ serde_json::to_vec(&serde_json::json!({
// "type": "start", "type": "connection_init",
// "id": "1", }))
// "payload": { .unwrap(),
// "query": "query { value }" )
// }, .await
// })) .unwrap();
// .unwrap(),
// ) assert_eq!(
// .await Some(serde_json::json!({
// .unwrap(); "type": "connection_ack",
// })),
// assert_eq!( serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// Some(serde_json::json!({ );
// "type": "data",
// "id": "1", sink.send(
// "payload": { "data": { "value": 999 } }, serde_json::to_vec(&serde_json::json!({
// })), "type": "start",
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() "id": "1",
// ); "payload": {
// "query": "query { value }"
// assert_eq!( },
// Some(serde_json::json!({ }))
// "type": "complete", .unwrap(),
// "id": "1", )
// })), .await
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap() .unwrap();
// );
// } assert_eq!(
Some(serde_json::json!({
"type": "data",
"id": "1",
"payload": { "data": { "value": 999 } },
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
assert_eq!(
Some(serde_json::json!({
"type": "complete",
"id": "1",
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
}