Remove SimpleBroker

This commit is contained in:
Koxiaet 2020-09-13 10:47:28 +01:00
parent 6cf7e78ea9
commit 367076cd66
5 changed files with 1 additions and 137 deletions

View File

@ -146,7 +146,6 @@ pub use request::Request;
pub use response::Response;
pub use schema::{Schema, SchemaBuilder, SchemaEnv};
pub use serde_json::Number;
pub use subscription::SimpleBroker;
pub use validation::ValidationMode;
pub use types::*;

View File

@ -24,7 +24,7 @@ pub trait SubscriptionType: Type {
type BoxCreateStreamFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
pub fn create_subscription_stream<'a, Query, Mutation, Subscription>(
pub(crate) fn create_subscription_stream<'a, Query, Mutation, Subscription>(
schema: &'a Schema<Query, Mutation, Subscription>,
environment: QueryEnv,
ctx: &'a ContextSelectionSet<'_>,

View File

@ -1,5 +0,0 @@
mod simple_broker;
mod subscription_type;
pub use simple_broker::SimpleBroker;
pub use subscription_type::{create_subscription_stream, SubscriptionType};

View File

@ -1,65 +0,0 @@
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::task::{Context, Poll};
use futures::{Stream, StreamExt};
use once_cell::sync::Lazy;
use serde::export::PhantomData;
use slab::Slab;
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Mutex;
static SUBSCRIBERS: Lazy<Mutex<HashMap<TypeId, Box<dyn Any + Send>>>> = Lazy::new(Default::default);
struct Senders<T>(Slab<UnboundedSender<T>>);
struct BrokerStream<T: Sync + Send + Clone + 'static>(usize, UnboundedReceiver<T>);
fn with_senders<T, F, R>(f: F) -> R
where
T: Sync + Send + Clone + 'static,
F: FnOnce(&mut Senders<T>) -> R,
{
let mut map = SUBSCRIBERS.lock().unwrap();
let senders = map
.entry(TypeId::of::<Senders<T>>())
.or_insert_with(|| Box::new(Senders::<T>(Default::default())));
f(senders.downcast_mut::<Senders<T>>().unwrap())
}
impl<T: Sync + Send + Clone + 'static> Drop for BrokerStream<T> {
fn drop(&mut self) {
with_senders::<T, _, _>(|senders| senders.0.remove(self.0));
}
}
impl<T: Sync + Send + Clone + 'static> Stream for BrokerStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.1.poll_next_unpin(cx)
}
}
/// A simple broker based on memory
pub struct SimpleBroker<T>(PhantomData<T>);
impl<T: Sync + Send + Clone + 'static> SimpleBroker<T> {
/// Publish a message that all subscription streams can receive.
pub fn publish(msg: T) {
with_senders::<T, _, _>(|senders| {
for (_, sender) in senders.0.iter_mut() {
sender.start_send(msg.clone()).ok();
}
});
}
/// Subscribe to the message of the specified type and returns a `Stream`.
pub fn subscribe() -> impl Stream<Item = T> {
with_senders::<T, _, _>(|senders| {
let (tx, rx) = mpsc::unbounded();
let id = senders.0.insert(tx);
BrokerStream(id, rx)
})
}
}

View File

@ -58,71 +58,6 @@ pub async fn test_subscription() {
}
}
#[async_std::test]
pub async fn test_simple_broker() {
struct QueryRoot;
#[derive(Clone, GQLSimpleObject)]
struct Event1 {
value: i32,
}
#[derive(Clone, GQLSimpleObject)]
struct Event2 {
value: i32,
}
#[GQLObject]
impl QueryRoot {}
struct SubscriptionRoot;
#[GQLSubscription]
impl SubscriptionRoot {
async fn events1(&self) -> impl Stream<Item = Event1> {
let stream = SimpleBroker::<Event1>::subscribe();
SimpleBroker::publish(Event1 { value: 10 });
SimpleBroker::publish(Event1 { value: 15 });
stream
}
async fn events2(&self) -> impl Stream<Item = Event2> {
let stream = SimpleBroker::<Event2>::subscribe();
SimpleBroker::publish(Event2 { value: 88 });
SimpleBroker::publish(Event2 { value: 99 });
stream
}
}
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let mut stream1 = schema
.execute_stream("subscription { events1 { value } }")
.map(|resp| resp.into_result().unwrap().data)
.boxed();
let mut stream2 = schema
.execute_stream("subscription { events2 { value } }")
.map(|resp| resp.into_result().unwrap().data)
.boxed();
assert_eq!(
stream1.next().await,
Some(serde_json::json!({ "events1": {"value": 10} }))
);
assert_eq!(
stream1.next().await,
Some(serde_json::json!({ "events1": {"value": 15} }))
);
assert_eq!(
stream2.next().await,
Some(serde_json::json!({ "events2": {"value": 88} }))
);
assert_eq!(
stream2.next().await,
Some(serde_json::json!({ "events2": {"value": 99} }))
);
}
#[async_std::test]
pub async fn test_subscription_with_ctx_data() {
struct QueryRoot;