diff --git a/src/types/connection/stream.rs b/src/types/connection/stream.rs index cf0ca23f..a99d4558 100644 --- a/src/types/connection/stream.rs +++ b/src/types/connection/stream.rs @@ -1,8 +1,62 @@ use crate::types::connection::{EmptyEdgeFields, QueryOperation}; use crate::{Connection, Context, Cursor, DataSource, FieldResult, ObjectType}; use futures::{Stream, StreamExt}; +use std::collections::VecDeque; use std::pin::Pin; +/// Utility struct to convert a Stream to a Connection +/// +/// # Examples +/// +/// ```rust +/// use async_graphql::*; +/// use byteorder::{ReadBytesExt, BE}; +/// use futures::StreamExt; +/// +/// struct QueryRoot; +/// +/// #[Object] +/// impl QueryRoot { +/// async fn stream_connection(&self, ctx: &Context<'_>, +/// after: Option, +/// before: Option, +/// first: Option, +/// last: Option +/// ) -> FieldResult> { +/// let edges_stream = futures::stream::iter(vec!["a", "b", "c", "d", "e", "f"]) +/// .map(|node| { +/// let cursor: Cursor = node.to_owned().into(); +/// (cursor, EmptyEdgeFields, node) +/// }); +/// +/// let mut source: StreamDataSource<_> = edges_stream.into(); +/// source.query(ctx, after, before, first, last).await +/// } +/// } +/// +/// #[async_std::main] +/// async fn main() { +/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); +/// +/// assert_eq!(schema.execute("{ streamConnection(first: 2) { edges { node } } }").await.unwrap().data, serde_json::json!({ +/// "streamConnection": { +/// "edges": [ +/// { "node": "a" }, +/// { "node": "b" } +/// ] +/// }, +/// })); +/// +/// assert_eq!(schema.execute("{ streamConnection(last: 2) { edges { node } } }").await.unwrap().data, serde_json::json!({ +/// "streamConnection": { +/// "edges": [ +/// { "node": "e" }, +/// { "node": "f" } +/// ] +/// }, +/// })); +/// } +/// ``` pub struct StreamDataSource<'a, T, E = EmptyEdgeFields> where T: Send + Sync + 'a, @@ -11,7 +65,21 @@ where stream: Pin + Send + Sync + 'a>>, } -impl<'a, S, T, E> From for StreamDataSource<'a, T, E> +impl<'a, T, E> StreamDataSource<'a, T, E> +where + T: Send + Sync + 'a, + E: ObjectType + Send + Sync + 'a, +{ + /// Creates a StreamDataSource from an `Iterator` of edges. + pub fn from_iter(iter: I) -> Self + where + I: Iterator + Send + Sync + 'a, + { + futures::stream::iter(iter).into() + } +} + +impl<'a, S: Stream, T, E> From for StreamDataSource<'a, T, E> where S: Stream + Send + Sync + 'a, T: Send + Sync + 'a, @@ -24,8 +92,7 @@ where } } -use std::collections::VecDeque; -struct State +struct State where T: Send + Sync, E: ObjectType + Send + Sync,