diff --git a/src/lib.rs b/src/lib.rs index 03fdf8d6..df6e25a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -146,7 +146,6 @@ pub use request::Request; pub use response::Response; pub use schema::{Schema, SchemaBuilder, SchemaEnv}; pub use serde_json::Number; -pub use subscription::SimpleBroker; pub use validation::ValidationMode; pub use types::*; diff --git a/src/subscription/subscription_type.rs b/src/subscription.rs similarity index 98% rename from src/subscription/subscription_type.rs rename to src/subscription.rs index 0488c0dd..2a2eb821 100644 --- a/src/subscription/subscription_type.rs +++ b/src/subscription.rs @@ -24,7 +24,7 @@ pub trait SubscriptionType: Type { type BoxCreateStreamFuture<'a> = Pin> + Send + 'a>>; -pub fn create_subscription_stream<'a, Query, Mutation, Subscription>( +pub(crate) fn create_subscription_stream<'a, Query, Mutation, Subscription>( schema: &'a Schema, environment: QueryEnv, ctx: &'a ContextSelectionSet<'_>, diff --git a/src/subscription/mod.rs b/src/subscription/mod.rs deleted file mode 100644 index 7e4bc436..00000000 --- a/src/subscription/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod simple_broker; -mod subscription_type; - -pub use simple_broker::SimpleBroker; -pub use subscription_type::{create_subscription_stream, SubscriptionType}; diff --git a/src/subscription/simple_broker.rs b/src/subscription/simple_broker.rs deleted file mode 100644 index 753399ad..00000000 --- a/src/subscription/simple_broker.rs +++ /dev/null @@ -1,65 +0,0 @@ -use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; -use futures::task::{Context, Poll}; -use futures::{Stream, StreamExt}; -use once_cell::sync::Lazy; -use serde::export::PhantomData; -use slab::Slab; -use std::any::{Any, TypeId}; -use std::collections::HashMap; -use std::pin::Pin; -use std::sync::Mutex; - -static SUBSCRIBERS: Lazy>>> = Lazy::new(Default::default); - -struct Senders(Slab>); - -struct BrokerStream(usize, UnboundedReceiver); - -fn with_senders(f: F) -> R -where - T: Sync + Send + Clone + 'static, - F: FnOnce(&mut Senders) -> R, -{ - let mut map = SUBSCRIBERS.lock().unwrap(); - let senders = map - .entry(TypeId::of::>()) - .or_insert_with(|| Box::new(Senders::(Default::default()))); - f(senders.downcast_mut::>().unwrap()) -} - -impl Drop for BrokerStream { - fn drop(&mut self) { - with_senders::(|senders| senders.0.remove(self.0)); - } -} - -impl Stream for BrokerStream { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.1.poll_next_unpin(cx) - } -} - -/// A simple broker based on memory -pub struct SimpleBroker(PhantomData); - -impl SimpleBroker { - /// Publish a message that all subscription streams can receive. - pub fn publish(msg: T) { - with_senders::(|senders| { - for (_, sender) in senders.0.iter_mut() { - sender.start_send(msg.clone()).ok(); - } - }); - } - - /// Subscribe to the message of the specified type and returns a `Stream`. - pub fn subscribe() -> impl Stream { - with_senders::(|senders| { - let (tx, rx) = mpsc::unbounded(); - let id = senders.0.insert(tx); - BrokerStream(id, rx) - }) - } -} diff --git a/tests/subscription.rs b/tests/subscription.rs index 9bd1e851..71f7e521 100644 --- a/tests/subscription.rs +++ b/tests/subscription.rs @@ -58,71 +58,6 @@ pub async fn test_subscription() { } } -#[async_std::test] -pub async fn test_simple_broker() { - struct QueryRoot; - - #[derive(Clone, GQLSimpleObject)] - struct Event1 { - value: i32, - } - - #[derive(Clone, GQLSimpleObject)] - struct Event2 { - value: i32, - } - - #[GQLObject] - impl QueryRoot {} - - struct SubscriptionRoot; - - #[GQLSubscription] - impl SubscriptionRoot { - async fn events1(&self) -> impl Stream { - let stream = SimpleBroker::::subscribe(); - SimpleBroker::publish(Event1 { value: 10 }); - SimpleBroker::publish(Event1 { value: 15 }); - stream - } - - async fn events2(&self) -> impl Stream { - let stream = SimpleBroker::::subscribe(); - SimpleBroker::publish(Event2 { value: 88 }); - SimpleBroker::publish(Event2 { value: 99 }); - stream - } - } - - let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); - let mut stream1 = schema - .execute_stream("subscription { events1 { value } }") - .map(|resp| resp.into_result().unwrap().data) - .boxed(); - let mut stream2 = schema - .execute_stream("subscription { events2 { value } }") - .map(|resp| resp.into_result().unwrap().data) - .boxed(); - - assert_eq!( - stream1.next().await, - Some(serde_json::json!({ "events1": {"value": 10} })) - ); - assert_eq!( - stream1.next().await, - Some(serde_json::json!({ "events1": {"value": 15} })) - ); - - assert_eq!( - stream2.next().await, - Some(serde_json::json!({ "events2": {"value": 88} })) - ); - assert_eq!( - stream2.next().await, - Some(serde_json::json!({ "events2": {"value": 99} })) - ); -} - #[async_std::test] pub async fn test_subscription_with_ctx_data() { struct QueryRoot;