This commit is contained in:
Samuel Hurel 2020-05-21 11:45:15 +02:00
parent 36c05dc5a7
commit 217aa34787

View File

@ -1,8 +1,62 @@
use crate::types::connection::{EmptyEdgeFields, QueryOperation};
use crate::{Connection, Context, Cursor, DataSource, FieldResult, ObjectType};
use futures::{Stream, StreamExt};
use std::collections::VecDeque;
use std::pin::Pin;
/// Utility struct to convert a Stream to a Connection
///
/// # Examples
///
/// ```rust
/// use async_graphql::*;
/// use byteorder::{ReadBytesExt, BE};
/// use futures::StreamExt;
///
/// struct QueryRoot;
///
/// #[Object]
/// impl QueryRoot {
/// async fn stream_connection(&self, ctx: &Context<'_>,
/// after: Option<Cursor>,
/// before: Option<Cursor>,
/// first: Option<i32>,
/// last: Option<i32>
/// ) -> FieldResult<Connection<&str>> {
/// let edges_stream = futures::stream::iter(vec!["a", "b", "c", "d", "e", "f"])
/// .map(|node| {
/// let cursor: Cursor = node.to_owned().into();
/// (cursor, EmptyEdgeFields, node)
/// });
///
/// let mut source: StreamDataSource<_> = edges_stream.into();
/// source.query(ctx, after, before, first, last).await
/// }
/// }
///
/// #[async_std::main]
/// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
///
/// assert_eq!(schema.execute("{ streamConnection(first: 2) { edges { node } } }").await.unwrap().data, serde_json::json!({
/// "streamConnection": {
/// "edges": [
/// { "node": "a" },
/// { "node": "b" }
/// ]
/// },
/// }));
///
/// assert_eq!(schema.execute("{ streamConnection(last: 2) { edges { node } } }").await.unwrap().data, serde_json::json!({
/// "streamConnection": {
/// "edges": [
/// { "node": "e" },
/// { "node": "f" }
/// ]
/// },
/// }));
/// }
/// ```
pub struct StreamDataSource<'a, T, E = EmptyEdgeFields>
where
T: Send + Sync + 'a,
@ -11,7 +65,21 @@ where
stream: Pin<Box<dyn Stream<Item = (Cursor, E, T)> + Send + Sync + 'a>>,
}
impl<'a, S, T, E> From<S> for StreamDataSource<'a, T, E>
impl<'a, T, E> StreamDataSource<'a, T, E>
where
T: Send + Sync + 'a,
E: ObjectType + Send + Sync + 'a,
{
/// Creates a StreamDataSource from an `Iterator` of edges.
pub fn from_iter<I>(iter: I) -> Self
where
I: Iterator<Item = (Cursor, E, T)> + Send + Sync + 'a,
{
futures::stream::iter(iter).into()
}
}
impl<'a, S: Stream, T, E> From<S> for StreamDataSource<'a, T, E>
where
S: Stream<Item = (Cursor, E, T)> + Send + Sync + 'a,
T: Send + Sync + 'a,
@ -24,8 +92,7 @@ where
}
}
use std::collections::VecDeque;
struct State<T, E = EmptyEdgeFields>
struct State<T, E>
where
T: Send + Sync,
E: ObjectType + Send + Sync,