Fix tests

This commit is contained in:
Sunli 2020-09-11 16:05:21 +08:00
parent 049b652dd6
commit c129079f69
7 changed files with 18 additions and 20 deletions

View File

@ -1,9 +1,6 @@
//mod connection;
mod simple_broker;
mod subscription_type;
// mod ws_transport;
pub mod transports;
//pub use connection::{create_connection, ConnectionTransport, SubscriptionStreams};
pub use simple_broker::SimpleBroker;
pub use subscription_type::{create_subscription_stream, SubscriptionType};
pub mod transports;

View File

@ -1 +1,3 @@
//! Transports for subscription
pub mod websocket;

View File

@ -1,6 +1,8 @@
//! WebSocket transport for subscription
use crate::{http, Data, FieldResult, ObjectType, Response, Schema, SubscriptionType};
use futures::channel::mpsc;
use futures::task::{AtomicWaker, Context, Poll};
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
@ -57,7 +59,6 @@ where
initializer: Arc::new(initializer),
rx_bytes,
handle_request_fut: None,
waker: AtomicWaker::new(),
ctx: Some(WSContext {
streams: &mut streams,
send_buf: &mut send_buf,
@ -90,7 +91,6 @@ struct SubscriptionStream<'a, Query, Mutation, Subscription> {
initializer: InitializerFn,
rx_bytes: mpsc::UnboundedReceiver<Vec<u8>>,
handle_request_fut: Option<HandleRequestBoxFut<'a>>,
waker: AtomicWaker,
ctx: Option<WSContext<'a>>,
}
@ -134,7 +134,6 @@ where
ctx,
data,
)));
this.waker.wake();
continue;
}
Poll::Ready(None) => return Poll::Ready(None),
@ -191,15 +190,10 @@ where
for id in closed {
ctx.streams.remove(&id);
}
this.waker.register(cx.waker());
return Poll::Pending;
} else {
this.waker.register(cx.waker());
return Poll::Pending;
}
} else {
return Poll::Pending;
}
return Poll::Pending;
}
}
}

View File

@ -124,7 +124,7 @@ pub async fn test_field_features() {
}
);
let mut stream = schema.execute_stream("subscription { values }");
let mut stream = schema.execute_stream("subscription { values }").boxed();
assert_eq!(
stream.next().await.map(|resp| resp.data),
Some(serde_json::json!({
@ -132,7 +132,7 @@ pub async fn test_field_features() {
}))
);
let mut stream = schema.execute_stream("subscription { valuesBson }");
let mut stream = schema.execute_stream("subscription { valuesBson }").boxed();
assert_eq!(
stream.next().await.map(|resp| resp.data),
Some(serde_json::json!({
@ -143,6 +143,7 @@ pub async fn test_field_features() {
assert_eq!(
schema
.execute_stream("subscription { valuesAbc }")
.boxed()
.next()
.await
.unwrap()

View File

@ -146,6 +146,7 @@ pub async fn test_guard() {
assert_eq!(
schema
.execute_stream(Request::new("subscription { values }").data(Role::Guest))
.boxed()
.next()
.await
.unwrap()

View File

@ -193,7 +193,8 @@ pub async fn test_merged_subscription() {
{
let mut stream = schema
.execute_stream("subscription { events1 }")
.map(|resp| resp.into_result().unwrap().data);
.map(|resp| resp.into_result().unwrap().data)
.boxed();
for i in 0i32..10 {
assert_eq!(
Some(serde_json::json!({
@ -208,7 +209,8 @@ pub async fn test_merged_subscription() {
{
let mut stream = schema
.execute_stream("subscription { events2 }")
.map(|resp| resp.into_result().unwrap().data);
.map(|resp| resp.into_result().unwrap().data)
.boxed();
for i in 10i32..20 {
assert_eq!(
Some(serde_json::json!({

View File

@ -64,7 +64,8 @@ pub async fn test_input_value_custom_error() {
let mut stream = schema
.execute_stream("subscription { type }")
.map(|resp| resp.into_result())
.map_ok(|resp| resp.data);
.map_ok(|resp| resp.data)
.boxed();
for i in 0..10 {
assert_eq!(
Some(Ok(serde_json::json!({ "type": i }))),