2020-04-06 10:30:38 +00:00
|
|
|
use async_graphql::*;
|
2020-09-10 11:35:48 +00:00
|
|
|
use futures::{Stream, StreamExt, TryStreamExt};
|
2020-04-06 10:30:38 +00:00
|
|
|
|
|
|
|
#[async_std::test]
|
|
|
|
pub async fn test_subscription() {
|
|
|
|
struct QueryRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[derive(SimpleObject)]
|
2020-04-06 10:30:38 +00:00
|
|
|
struct Event {
|
|
|
|
a: i32,
|
|
|
|
b: i32,
|
|
|
|
}
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-04-06 10:30:38 +00:00
|
|
|
impl QueryRoot {}
|
|
|
|
|
|
|
|
struct SubscriptionRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Subscription]
|
2020-04-06 10:30:38 +00:00
|
|
|
impl SubscriptionRoot {
|
2020-04-07 06:30:46 +00:00
|
|
|
async fn values(&self, start: i32, end: i32) -> impl Stream<Item = i32> {
|
2020-04-06 10:30:38 +00:00
|
|
|
futures::stream::iter(start..end)
|
|
|
|
}
|
|
|
|
|
2020-04-07 06:30:46 +00:00
|
|
|
async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> {
|
2020-04-06 10:30:38 +00:00
|
|
|
futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 }))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream("subscription { values(start: 10, end: 20) }")
|
2020-09-11 07:54:56 +00:00
|
|
|
.map(|resp| resp.into_result().unwrap().data)
|
|
|
|
.boxed();
|
2020-04-06 10:30:38 +00:00
|
|
|
for i in 10..20 {
|
|
|
|
assert_eq!(
|
2020-09-10 11:35:48 +00:00
|
|
|
Some(serde_json::json!({ "values": i })),
|
2020-04-06 10:30:38 +00:00
|
|
|
stream.next().await
|
|
|
|
);
|
|
|
|
}
|
|
|
|
assert!(stream.next().await.is_none());
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream("subscription { events(start: 10, end: 20) { a b } }")
|
2020-09-11 07:54:56 +00:00
|
|
|
.map(|resp| resp.into_result().unwrap().data)
|
|
|
|
.boxed();
|
2020-04-06 10:30:38 +00:00
|
|
|
for i in 10..20 {
|
|
|
|
assert_eq!(
|
2020-09-10 11:35:48 +00:00
|
|
|
Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })),
|
2020-04-06 10:30:38 +00:00
|
|
|
stream.next().await
|
|
|
|
);
|
|
|
|
}
|
|
|
|
assert!(stream.next().await.is_none());
|
|
|
|
}
|
|
|
|
}
|
2020-04-08 01:05:54 +00:00
|
|
|
|
2020-04-23 02:26:16 +00:00
|
|
|
#[async_std::test]
|
|
|
|
pub async fn test_subscription_with_ctx_data() {
|
|
|
|
struct QueryRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-04-23 02:26:16 +00:00
|
|
|
impl QueryRoot {}
|
|
|
|
|
2020-04-23 14:29:38 +00:00
|
|
|
struct MyObject;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-04-23 14:29:38 +00:00
|
|
|
impl MyObject {
|
|
|
|
async fn value(&self, ctx: &Context<'_>) -> i32 {
|
2020-07-06 23:54:57 +00:00
|
|
|
*ctx.data_unchecked::<i32>()
|
2020-04-23 14:29:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-23 02:26:16 +00:00
|
|
|
struct SubscriptionRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Subscription]
|
2020-04-23 02:26:16 +00:00
|
|
|
impl SubscriptionRoot {
|
|
|
|
async fn values(&self, ctx: &Context<'_>) -> impl Stream<Item = i32> {
|
2020-07-06 23:54:57 +00:00
|
|
|
let value = *ctx.data_unchecked::<i32>();
|
2020-04-23 02:26:16 +00:00
|
|
|
futures::stream::once(async move { value })
|
|
|
|
}
|
2020-04-23 14:29:38 +00:00
|
|
|
|
|
|
|
async fn objects(&self) -> impl Stream<Item = MyObject> {
|
|
|
|
futures::stream::once(async move { MyObject })
|
|
|
|
}
|
2020-04-23 02:26:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream(Request::new("subscription { values objects { value } }").data(100i32))
|
2020-09-11 07:54:56 +00:00
|
|
|
.map(|resp| resp.data)
|
|
|
|
.boxed();
|
2020-04-23 02:26:16 +00:00
|
|
|
assert_eq!(
|
2020-09-10 11:35:48 +00:00
|
|
|
Some(serde_json::json!({ "values": 100 })),
|
2020-04-23 02:26:16 +00:00
|
|
|
stream.next().await
|
|
|
|
);
|
2020-04-23 14:29:38 +00:00
|
|
|
assert_eq!(
|
2020-09-10 11:35:48 +00:00
|
|
|
Some(serde_json::json!({ "objects": { "value": 100 } })),
|
2020-04-23 14:29:38 +00:00
|
|
|
stream.next().await
|
|
|
|
);
|
2020-04-23 02:26:16 +00:00
|
|
|
assert!(stream.next().await.is_none());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-23 03:26:36 +00:00
|
|
|
#[async_std::test]
|
|
|
|
pub async fn test_subscription_with_token() {
|
|
|
|
struct QueryRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-04-23 03:26:36 +00:00
|
|
|
impl QueryRoot {}
|
|
|
|
|
|
|
|
struct SubscriptionRoot;
|
|
|
|
|
|
|
|
struct Token(String);
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Subscription]
|
2020-04-23 03:26:36 +00:00
|
|
|
impl SubscriptionRoot {
|
|
|
|
async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> {
|
2020-07-06 23:54:57 +00:00
|
|
|
if ctx.data_unchecked::<Token>().0 != "123456" {
|
2020-04-23 03:26:36 +00:00
|
|
|
return Err("forbidden".into());
|
|
|
|
}
|
|
|
|
Ok(futures::stream::once(async move { 100 }))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream(
|
|
|
|
Request::new("subscription { values }").data(Token("123456".to_string())),
|
2020-04-23 03:26:36 +00:00
|
|
|
)
|
2020-09-11 07:54:56 +00:00
|
|
|
.map(|resp| resp.into_result().unwrap().data)
|
|
|
|
.boxed();
|
2020-04-23 03:26:36 +00:00
|
|
|
assert_eq!(
|
2020-09-10 11:35:48 +00:00
|
|
|
Some(serde_json::json!({ "values": 100 })),
|
2020-04-23 03:26:36 +00:00
|
|
|
stream.next().await
|
|
|
|
);
|
|
|
|
assert!(stream.next().await.is_none());
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
assert!(schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream(
|
|
|
|
Request::new("subscription { values }").data(Token("654321".to_string()))
|
2020-04-23 03:26:36 +00:00
|
|
|
)
|
2020-09-11 07:54:56 +00:00
|
|
|
.boxed()
|
2020-09-10 23:58:02 +00:00
|
|
|
.next()
|
2020-04-23 03:26:36 +00:00
|
|
|
.await
|
2020-09-10 23:58:02 +00:00
|
|
|
.unwrap()
|
2020-04-23 03:26:36 +00:00
|
|
|
.is_err());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-03 01:12:14 +00:00
|
|
|
#[async_std::test]
|
|
|
|
pub async fn test_subscription_inline_fragment() {
|
|
|
|
struct QueryRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[derive(SimpleObject)]
|
2020-05-03 01:12:14 +00:00
|
|
|
struct Event {
|
|
|
|
a: i32,
|
|
|
|
b: i32,
|
|
|
|
}
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-05-03 01:12:14 +00:00
|
|
|
impl QueryRoot {}
|
|
|
|
|
|
|
|
struct SubscriptionRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Subscription]
|
2020-05-03 01:12:14 +00:00
|
|
|
impl SubscriptionRoot {
|
|
|
|
async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> {
|
|
|
|
futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 }))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream(
|
2020-05-03 01:12:14 +00:00
|
|
|
r#"
|
|
|
|
subscription {
|
|
|
|
events(start: 10, end: 20) {
|
|
|
|
a
|
|
|
|
... {
|
|
|
|
b
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"#,
|
|
|
|
)
|
2020-09-11 07:54:56 +00:00
|
|
|
.map(|resp| resp.data)
|
|
|
|
.boxed();
|
2020-05-03 01:12:14 +00:00
|
|
|
for i in 10..20 {
|
|
|
|
assert_eq!(
|
2020-09-10 11:35:48 +00:00
|
|
|
Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })),
|
2020-05-03 01:12:14 +00:00
|
|
|
stream.next().await
|
|
|
|
);
|
|
|
|
}
|
|
|
|
assert!(stream.next().await.is_none());
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_std::test]
|
|
|
|
pub async fn test_subscription_fragment() {
|
|
|
|
struct QueryRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[derive(SimpleObject)]
|
2020-05-03 01:12:14 +00:00
|
|
|
struct Event {
|
|
|
|
a: i32,
|
|
|
|
b: i32,
|
|
|
|
}
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[derive(Interface)]
|
2020-09-13 03:41:15 +00:00
|
|
|
#[graphql(field(name = "a", type = "&i32"))]
|
2020-05-11 03:25:49 +00:00
|
|
|
enum MyInterface {
|
|
|
|
Event(Event),
|
|
|
|
}
|
2020-05-03 01:12:14 +00:00
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-05-03 01:12:14 +00:00
|
|
|
impl QueryRoot {}
|
|
|
|
|
|
|
|
struct SubscriptionRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Subscription]
|
2020-05-03 01:12:14 +00:00
|
|
|
impl SubscriptionRoot {
|
|
|
|
async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> {
|
|
|
|
futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 }))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let schema = Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot)
|
|
|
|
.register_type::<MyInterface>()
|
|
|
|
.finish();
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream(
|
2020-05-03 01:12:14 +00:00
|
|
|
r#"
|
|
|
|
subscription s {
|
|
|
|
events(start: 10, end: 20) {
|
|
|
|
... on MyInterface {
|
|
|
|
a
|
|
|
|
}
|
|
|
|
b
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"#,
|
|
|
|
)
|
2020-09-11 07:54:56 +00:00
|
|
|
.map(|resp| resp.data)
|
|
|
|
.boxed();
|
|
|
|
for i in 10i32..20 {
|
2020-05-03 01:12:14 +00:00
|
|
|
assert_eq!(
|
2020-09-10 11:35:48 +00:00
|
|
|
Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })),
|
2020-05-03 01:12:14 +00:00
|
|
|
stream.next().await
|
|
|
|
);
|
|
|
|
}
|
|
|
|
assert!(stream.next().await.is_none());
|
|
|
|
}
|
2020-05-03 02:06:17 +00:00
|
|
|
|
|
|
|
#[async_std::test]
|
|
|
|
pub async fn test_subscription_fragment2() {
|
|
|
|
struct QueryRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[derive(SimpleObject)]
|
2020-05-03 02:06:17 +00:00
|
|
|
struct Event {
|
|
|
|
a: i32,
|
|
|
|
b: i32,
|
|
|
|
}
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[derive(Interface)]
|
2020-09-13 03:41:15 +00:00
|
|
|
#[graphql(field(name = "a", type = "&i32"))]
|
2020-05-11 03:25:49 +00:00
|
|
|
enum MyInterface {
|
|
|
|
Event(Event),
|
|
|
|
}
|
2020-05-03 02:06:17 +00:00
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-05-03 02:06:17 +00:00
|
|
|
impl QueryRoot {}
|
|
|
|
|
|
|
|
struct SubscriptionRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Subscription]
|
2020-05-03 02:06:17 +00:00
|
|
|
impl SubscriptionRoot {
|
|
|
|
async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> {
|
|
|
|
futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 }))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let schema = Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot)
|
|
|
|
.register_type::<MyInterface>()
|
|
|
|
.finish();
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream(
|
2020-05-03 02:06:17 +00:00
|
|
|
r#"
|
|
|
|
subscription s {
|
|
|
|
events(start: 10, end: 20) {
|
|
|
|
... Frag
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fragment Frag on Event {
|
|
|
|
a b
|
|
|
|
}
|
|
|
|
"#,
|
|
|
|
)
|
2020-09-11 07:54:56 +00:00
|
|
|
.map(|resp| resp.data)
|
|
|
|
.boxed();
|
2020-05-03 02:06:17 +00:00
|
|
|
for i in 10..20 {
|
|
|
|
assert_eq!(
|
2020-09-10 11:35:48 +00:00
|
|
|
Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })),
|
2020-05-03 02:06:17 +00:00
|
|
|
stream.next().await
|
|
|
|
);
|
|
|
|
}
|
|
|
|
assert!(stream.next().await.is_none());
|
|
|
|
}
|
2020-05-03 08:02:46 +00:00
|
|
|
|
|
|
|
#[async_std::test]
|
|
|
|
pub async fn test_subscription_error() {
|
|
|
|
struct QueryRoot;
|
|
|
|
|
|
|
|
struct Event {
|
|
|
|
value: i32,
|
|
|
|
}
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-05-03 08:02:46 +00:00
|
|
|
impl Event {
|
|
|
|
async fn value(&self) -> FieldResult<i32> {
|
|
|
|
if self.value < 5 {
|
|
|
|
Ok(self.value)
|
|
|
|
} else {
|
|
|
|
Err("TestError".into())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-05-03 08:02:46 +00:00
|
|
|
impl QueryRoot {}
|
|
|
|
|
|
|
|
struct SubscriptionRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Subscription]
|
2020-05-03 08:02:46 +00:00
|
|
|
impl SubscriptionRoot {
|
|
|
|
async fn events(&self) -> impl Stream<Item = Event> {
|
|
|
|
futures::stream::iter((0..10).map(|n| Event { value: n }))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream("subscription { events { value } }")
|
|
|
|
.map(|resp| resp.into_result())
|
2020-09-11 07:54:56 +00:00
|
|
|
.map_ok(|resp| resp.data)
|
|
|
|
.boxed();
|
2020-05-03 08:02:46 +00:00
|
|
|
for i in 0i32..5 {
|
|
|
|
assert_eq!(
|
|
|
|
Some(Ok(serde_json::json!({ "events": { "value": i } }))),
|
|
|
|
stream.next().await
|
|
|
|
);
|
|
|
|
}
|
|
|
|
assert_eq!(
|
|
|
|
stream.next().await,
|
|
|
|
Some(Err(Error::Query {
|
|
|
|
pos: Pos {
|
|
|
|
line: 1,
|
|
|
|
column: 25
|
|
|
|
},
|
|
|
|
path: Some(serde_json::json!(["events", "value"])),
|
|
|
|
err: QueryError::FieldError {
|
|
|
|
err: "TestError".to_string(),
|
|
|
|
extended_error: None,
|
|
|
|
},
|
|
|
|
}))
|
|
|
|
);
|
2020-05-03 13:21:54 +00:00
|
|
|
|
|
|
|
assert!(stream.next().await.is_none());
|
2020-05-03 08:02:46 +00:00
|
|
|
}
|
2020-05-03 14:32:37 +00:00
|
|
|
|
|
|
|
#[async_std::test]
|
|
|
|
pub async fn test_subscription_fieldresult() {
|
|
|
|
struct QueryRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Object]
|
2020-05-03 14:32:37 +00:00
|
|
|
impl QueryRoot {}
|
|
|
|
|
|
|
|
struct SubscriptionRoot;
|
|
|
|
|
2020-09-18 00:52:13 +00:00
|
|
|
#[Subscription]
|
2020-05-03 14:32:37 +00:00
|
|
|
impl SubscriptionRoot {
|
|
|
|
async fn values(&self) -> impl Stream<Item = FieldResult<i32>> {
|
|
|
|
futures::stream::iter(0..5)
|
|
|
|
.map(FieldResult::Ok)
|
|
|
|
.chain(futures::stream::once(
|
|
|
|
async move { Err("StreamErr".into()) },
|
|
|
|
))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
|
|
|
let mut stream = schema
|
2020-09-10 11:35:48 +00:00
|
|
|
.execute_stream("subscription { values }")
|
|
|
|
.map(|resp| resp.into_result())
|
2020-09-11 07:54:56 +00:00
|
|
|
.map_ok(|resp| resp.data)
|
|
|
|
.boxed();
|
2020-05-03 14:32:37 +00:00
|
|
|
for i in 0i32..5 {
|
|
|
|
assert_eq!(
|
|
|
|
Some(Ok(serde_json::json!({ "values": i }))),
|
|
|
|
stream.next().await
|
|
|
|
);
|
|
|
|
}
|
|
|
|
assert_eq!(
|
|
|
|
stream.next().await,
|
|
|
|
Some(Err(Error::Query {
|
|
|
|
pos: Pos {
|
|
|
|
line: 1,
|
|
|
|
column: 16
|
|
|
|
},
|
|
|
|
path: Some(serde_json::json!(["values"])),
|
|
|
|
err: QueryError::FieldError {
|
|
|
|
err: "StreamErr".to_string(),
|
|
|
|
extended_error: None,
|
|
|
|
},
|
|
|
|
}))
|
|
|
|
);
|
|
|
|
|
|
|
|
assert!(stream.next().await.is_none());
|
|
|
|
}
|