use async_graphql::*; use futures::{SinkExt, Stream, StreamExt}; use std::sync::Arc; #[async_std::test] pub async fn test_subscription() { struct QueryRoot; #[SimpleObject] struct Event { #[field] a: i32, #[field] b: i32, } #[Object] impl QueryRoot {} struct SubscriptionRoot; #[Subscription] impl SubscriptionRoot { #[field] async fn values(&self, start: i32, end: i32) -> impl Stream { futures::stream::iter(start..end) } #[field] async fn events(&self, start: i32, end: i32) -> impl Stream { futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) } } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); { let mut stream = schema .create_subscription_stream( "subscription { values(start: 10, end: 20) }", None, Default::default(), None, ) .await .unwrap(); for i in 10..20 { assert_eq!( Some(serde_json::json!({ "values": i })), stream.next().await ); } assert!(stream.next().await.is_none()); } { let mut stream = schema .create_subscription_stream( "subscription { events(start: 10, end: 20) { a b } }", None, Default::default(), None, ) .await .unwrap(); for i in 10..20 { assert_eq!( Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })), stream.next().await ); } assert!(stream.next().await.is_none()); } } #[async_std::test] pub async fn test_simple_broker() { struct QueryRoot; #[SimpleObject] #[derive(Clone)] struct Event1 { #[field] value: i32, } #[SimpleObject] #[derive(Clone)] struct Event2 { #[field] value: i32, } #[Object] impl QueryRoot {} struct SubscriptionRoot; #[Subscription] impl SubscriptionRoot { #[field] async fn events1(&self) -> impl Stream { SimpleBroker::::subscribe() } #[field] async fn events2(&self) -> impl Stream { SimpleBroker::::subscribe() } } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); let mut stream1 = schema .create_subscription_stream( "subscription { events1 { value } }", None, Default::default(), None, ) .await .unwrap(); let mut stream2 = schema .create_subscription_stream( "subscription { events2 { value } }", None, Default::default(), None, ) .await .unwrap(); SimpleBroker::publish(Event1 { value: 10 }); SimpleBroker::publish(Event2 { value: 88 }); SimpleBroker::publish(Event1 { value: 15 }); SimpleBroker::publish(Event2 { value: 99 }); assert_eq!( stream1.next().await, Some(serde_json::json!({ "events1": {"value": 10} })) ); assert_eq!( stream1.next().await, Some(serde_json::json!({ "events1": {"value": 15} })) ); assert_eq!( stream2.next().await, Some(serde_json::json!({ "events2": {"value": 88} })) ); assert_eq!( stream2.next().await, Some(serde_json::json!({ "events2": {"value": 99} })) ); } #[async_std::test] pub async fn test_subscription_with_ctx_data() { struct QueryRoot; #[Object] impl QueryRoot {} struct SubscriptionRoot; #[Subscription] impl SubscriptionRoot { #[field] async fn values(&self, ctx: &Context<'_>) -> impl Stream { let value = *ctx.data::(); futures::stream::once(async move { value }) } } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); { let mut stream = schema .create_subscription_stream( "subscription { values }", None, Default::default(), Some(Arc::new({ let mut data = Data::default(); data.insert(100i32); data })), ) .await .unwrap(); assert_eq!( Some(serde_json::json!({ "values": 100 })), stream.next().await ); assert!(stream.next().await.is_none()); } } #[async_std::test] pub async fn test_subscription_ws_transport() { struct QueryRoot; #[Object] impl QueryRoot {} struct SubscriptionRoot; #[Subscription] impl SubscriptionRoot { #[field] async fn values(&self, ctx: &Context<'_>) -> impl Stream { let step = *ctx.data::(); futures::stream::iter((0..10).map(move |n| n * step)) } } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); let (mut sink, mut stream) = schema.subscription_connection( WebSocketTransport::default(), Some(Arc::new({ let mut data = Data::default(); data.insert(5); data })), ); sink.send( serde_json::to_vec(&serde_json::json!({ "type": "connection_init", })) .unwrap() .into(), ) .await .unwrap(); assert_eq!( Some(serde_json::json!({ "type": "connection_ack", })), serde_json::from_slice(&stream.next().await.unwrap()).unwrap() ); sink.send( serde_json::to_vec(&serde_json::json!({ "type": "start", "id": "1", "payload": { "query": "subscription { values }" }, })) .unwrap() .into(), ) .await .unwrap(); for i in 0..10 { assert_eq!( Some(serde_json::json!({ "type": "data", "id": "1", "payload": { "data": { "values": i * 5 } }, })), serde_json::from_slice(&stream.next().await.unwrap()).unwrap() ); } }