2020-05-19 08:24:15 +00:00
|
|
|
use crate::context::QueryEnv;
|
2020-09-06 05:38:31 +00:00
|
|
|
use crate::parser::types::{Selection, TypeCondition};
|
2020-09-12 09:29:52 +00:00
|
|
|
use crate::{Context, ContextSelectionSet, Response, Result, Schema, SchemaEnv, Type};
|
2020-04-07 06:30:46 +00:00
|
|
|
use futures::{Future, Stream};
|
2020-04-06 05:49:39 +00:00
|
|
|
use std::pin::Pin;
|
2020-03-29 12:02:52 +00:00
|
|
|
|
|
|
|
/// Represents a GraphQL subscription object
|
|
|
|
#[async_trait::async_trait]
|
|
|
|
pub trait SubscriptionType: Type {
|
|
|
|
/// This function returns true of type `EmptySubscription` only
|
|
|
|
#[doc(hidden)]
|
|
|
|
fn is_empty() -> bool {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
|
|
|
|
#[doc(hidden)]
|
2020-05-19 08:24:15 +00:00
|
|
|
async fn create_field_stream(
|
2020-03-29 12:02:52 +00:00
|
|
|
&self,
|
2020-05-12 08:27:06 +00:00
|
|
|
idx: usize,
|
2020-04-06 05:49:39 +00:00
|
|
|
ctx: &Context<'_>,
|
2020-05-19 08:24:15 +00:00
|
|
|
schema_env: SchemaEnv,
|
|
|
|
query_env: QueryEnv,
|
2020-09-10 08:39:43 +00:00
|
|
|
) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>>;
|
2020-04-06 05:49:39 +00:00
|
|
|
}
|
|
|
|
|
2020-04-07 06:30:46 +00:00
|
|
|
type BoxCreateStreamFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
|
|
|
|
|
|
|
|
pub fn create_subscription_stream<'a, Query, Mutation, Subscription>(
|
|
|
|
schema: &'a Schema<Query, Mutation, Subscription>,
|
2020-05-19 08:24:15 +00:00
|
|
|
environment: QueryEnv,
|
2020-04-07 06:30:46 +00:00
|
|
|
ctx: &'a ContextSelectionSet<'_>,
|
2020-09-10 08:39:43 +00:00
|
|
|
streams: &'a mut Vec<Pin<Box<dyn Stream<Item = Response> + Send>>>,
|
2020-04-07 06:30:46 +00:00
|
|
|
) -> BoxCreateStreamFuture<'a>
|
2020-04-06 05:49:39 +00:00
|
|
|
where
|
2020-09-12 09:29:52 +00:00
|
|
|
Query: Send + Sync,
|
|
|
|
Mutation: Send + Sync,
|
2020-04-06 05:49:39 +00:00
|
|
|
Subscription: SubscriptionType + Send + Sync + 'static + Sized,
|
|
|
|
{
|
2020-04-07 06:30:46 +00:00
|
|
|
Box::pin(async move {
|
2020-09-06 05:38:31 +00:00
|
|
|
for (idx, selection) in ctx.item.node.items.iter().enumerate() {
|
|
|
|
if ctx.is_skip(selection.node.directives())? {
|
|
|
|
continue;
|
|
|
|
}
|
2020-05-09 09:55:04 +00:00
|
|
|
match &selection.node {
|
2020-09-06 06:16:36 +00:00
|
|
|
Selection::Field(field) => streams.push(
|
|
|
|
schema
|
|
|
|
.subscription
|
|
|
|
.create_field_stream(
|
|
|
|
idx,
|
|
|
|
&ctx.with_field(field),
|
|
|
|
schema.env.clone(),
|
|
|
|
environment.clone(),
|
|
|
|
)
|
|
|
|
.await?,
|
|
|
|
),
|
2020-04-07 06:30:46 +00:00
|
|
|
Selection::FragmentSpread(fragment_spread) => {
|
2020-05-12 08:27:06 +00:00
|
|
|
if let Some(fragment) = ctx
|
2020-05-19 08:24:15 +00:00
|
|
|
.query_env
|
2020-05-12 08:27:06 +00:00
|
|
|
.document
|
2020-09-06 05:38:31 +00:00
|
|
|
.fragments
|
|
|
|
.get(&fragment_spread.node.fragment_name.node)
|
2020-04-07 06:30:46 +00:00
|
|
|
{
|
|
|
|
create_subscription_stream(
|
|
|
|
schema,
|
|
|
|
environment.clone(),
|
2020-09-06 05:38:31 +00:00
|
|
|
&ctx.with_selection_set(&fragment.node.selection_set),
|
2020-04-07 06:30:46 +00:00
|
|
|
streams,
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
}
|
2020-04-06 05:49:39 +00:00
|
|
|
}
|
2020-04-07 06:30:46 +00:00
|
|
|
Selection::InlineFragment(inline_fragment) => {
|
2020-09-06 06:16:36 +00:00
|
|
|
if let Some(TypeCondition { on: name }) = inline_fragment
|
|
|
|
.node
|
|
|
|
.type_condition
|
|
|
|
.as_ref()
|
|
|
|
.map(|v| &v.node)
|
2020-05-09 09:55:04 +00:00
|
|
|
{
|
2020-09-08 08:21:27 +00:00
|
|
|
if name.node.as_str() == Subscription::type_name() {
|
2020-04-07 06:30:46 +00:00
|
|
|
create_subscription_stream(
|
|
|
|
schema,
|
|
|
|
environment.clone(),
|
2020-09-06 05:38:31 +00:00
|
|
|
&ctx.with_selection_set(&inline_fragment.node.selection_set),
|
2020-04-07 06:30:46 +00:00
|
|
|
streams,
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
} else {
|
2020-04-06 05:49:39 +00:00
|
|
|
create_subscription_stream(
|
|
|
|
schema,
|
|
|
|
environment.clone(),
|
2020-09-06 05:38:31 +00:00
|
|
|
&ctx.with_selection_set(&inline_fragment.node.selection_set),
|
2020-04-06 05:49:39 +00:00
|
|
|
streams,
|
2020-04-07 06:30:46 +00:00
|
|
|
)
|
|
|
|
.await?;
|
2020-04-06 05:49:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-04-07 06:30:46 +00:00
|
|
|
Ok(())
|
|
|
|
})
|
2020-03-29 12:02:52 +00:00
|
|
|
}
|
2020-08-27 07:35:48 +00:00
|
|
|
|
|
|
|
#[async_trait::async_trait]
|
|
|
|
impl<T: SubscriptionType + Send + Sync> SubscriptionType for &T {
|
|
|
|
async fn create_field_stream(
|
|
|
|
&self,
|
|
|
|
idx: usize,
|
|
|
|
ctx: &Context<'_>,
|
|
|
|
schema_env: SchemaEnv,
|
|
|
|
query_env: QueryEnv,
|
2020-09-10 08:39:43 +00:00
|
|
|
) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>> {
|
2020-08-27 07:35:48 +00:00
|
|
|
T::create_field_stream(*self, idx, ctx, schema_env, query_env).await
|
|
|
|
}
|
|
|
|
}
|