Remove StreamDataSource wrapper

This commit is contained in:
Samuel Hurel 2020-05-21 12:51:01 +02:00
parent a7a17a43ad
commit d58fd6e942
6 changed files with 62 additions and 87 deletions

View File

@ -154,7 +154,7 @@ pub use subscription::{
}; };
pub use types::{ pub use types::{
Connection, Cursor, DataSource, Deferred, EmptyEdgeFields, EmptyMutation, EmptySubscription, Connection, Cursor, DataSource, Deferred, EmptyEdgeFields, EmptyMutation, EmptySubscription,
PageInfo, QueryOperation, StreamDataSource, Streamed, Upload, PageInfo, QueryOperation, Streamed, Upload,
}; };
pub use validation::ValidationMode; pub use validation::ValidationMode;

View File

@ -17,7 +17,7 @@ use std::borrow::Cow;
/// If the `T` type is `OutputValueType`, you can return the value as a field function directly, /// If the `T` type is `OutputValueType`, you can return the value as a field function directly,
/// otherwise you can use the `Connection::map` function to convert to a type that implements `OutputValueType`. /// otherwise you can use the `Connection::map` function to convert to a type that implements `OutputValueType`.
/// `E` is an extension object type that extends the edge fields. /// `E` is an extension object type that extends the edge fields.
pub struct Connection<T, E: ObjectType + Sync + Send = EmptyEdgeFields> { pub struct Connection<T, E: ObjectType + Send = EmptyEdgeFields> {
/// The total number of records. /// The total number of records.
pub total_count: Option<usize>, pub total_count: Option<usize>,
@ -28,7 +28,7 @@ pub struct Connection<T, E: ObjectType + Sync + Send = EmptyEdgeFields> {
pub nodes: Vec<(Cursor, E, T)>, pub nodes: Vec<(Cursor, E, T)>,
} }
impl<T, E: ObjectType + Sync + Send> Connection<T, E> { impl<T, E: ObjectType + Send> Connection<T, E> {
/// Create a connection object. /// Create a connection object.
pub fn new( pub fn new(
total_count: Option<usize>, total_count: Option<usize>,

View File

@ -10,7 +10,6 @@ use crate::{Context, FieldResult, ObjectType};
pub use connection_type::Connection; pub use connection_type::Connection;
pub use cursor::Cursor; pub use cursor::Cursor;
pub use page_info::PageInfo; pub use page_info::PageInfo;
pub use stream::StreamDataSource;
/// Connection query operation /// Connection query operation
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -207,14 +206,14 @@ struct Pagination {
/// } /// }
/// ``` /// ```
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait DataSource: Sync + Send { pub trait DataSource: Send {
/// Record type /// Record type
type Element; type Element;
/// Fields for Edge /// Fields for Edge
/// ///
/// Is a type that implements `ObjectType` and can be defined by the procedure macro `#[Object]`. /// Is a type that implements `ObjectType` and can be defined by the procedure macro `#[Object]`.
type EdgeFieldsObj: ObjectType + Send + Sync; type EdgeFieldsObj: ObjectType + Send;
/// Execute the query. /// Execute the query.
async fn query( async fn query(

View File

@ -1,10 +1,20 @@
use crate::types::connection::{EmptyEdgeFields, QueryOperation}; use crate::types::connection::QueryOperation;
use crate::{Connection, Context, Cursor, DataSource, FieldResult, ObjectType}; use crate::{Connection, Context, Cursor, DataSource, FieldResult, ObjectType};
use futures::{Stream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::pin::Pin;
/// Utility struct to convert a Stream to a Connection struct State<T, E>
where
T: Send,
E: ObjectType + Send,
{
has_seen_before: bool,
has_prev_page: bool,
has_next_page: bool,
edges: VecDeque<(Cursor, E, T)>,
}
/// You can use a Pin<Box<Stream<Item = (Cursor, E, T)>>> as a datasource
/// ///
/// # Examples /// # Examples
/// ///
@ -23,14 +33,14 @@ use std::pin::Pin;
/// first: Option<i32>, /// first: Option<i32>,
/// last: Option<i32> /// last: Option<i32>
/// ) -> FieldResult<Connection<&str>> { /// ) -> FieldResult<Connection<&str>> {
/// let edges_stream = futures::stream::iter(vec!["a", "b", "c", "d", "e", "f"]) /// let mut edges_stream = futures::stream::iter(vec!["a", "b", "c", "d", "e", "f"])
/// .map(|node| { /// .map(|node| {
/// let cursor: Cursor = node.to_owned().into(); /// let cursor: Cursor = node.to_owned().into();
/// (cursor, EmptyEdgeFields, node) /// (cursor, EmptyEdgeFields, node)
/// }); /// })
/// .boxed();
/// ///
/// let mut source: StreamDataSource<_> = edges_stream.into(); /// edges_stream.query(ctx, after, before, first, last).await
/// source.query(ctx, after, before, first, last).await
/// } /// }
/// } /// }
/// ///
@ -38,76 +48,44 @@ use std::pin::Pin;
/// async fn main() { /// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); /// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
/// ///
/// assert_eq!(schema.execute("{ streamConnection(first: 2) { edges { node } } }").await.unwrap().data, serde_json::json!({ /// assert_eq!(
/// schema
/// .execute("{ streamConnection(first: 2) { edges { node } } }")
/// .await
/// .unwrap()
/// .data,
/// serde_json::json!({
/// "streamConnection": { /// "streamConnection": {
/// "edges": [ /// "edges": [
/// { "node": "a" }, /// { "node": "a" },
/// { "node": "b" } /// { "node": "b" }
/// ] /// ]
/// }, /// },
/// })); /// })
/// );
/// ///
/// assert_eq!(schema.execute("{ streamConnection(last: 2) { edges { node } } }").await.unwrap().data, serde_json::json!({ /// assert_eq!(
/// schema
/// .execute("{ streamConnection(last: 2) { edges { node } } }")
/// .await
/// .unwrap()
/// .data,
/// serde_json::json!({
/// "streamConnection": { /// "streamConnection": {
/// "edges": [ /// "edges": [
/// { "node": "e" }, /// { "node": "e" },
/// { "node": "f" } /// { "node": "f" }
/// ] /// ]
/// }, /// },
/// })); /// })
/// );
/// } /// }
/// ``` /// ```
pub struct StreamDataSource<'a, T, E = EmptyEdgeFields>
where
T: Send + Sync + 'a,
E: ObjectType + Send + Sync + 'a,
{
stream: Pin<Box<dyn Stream<Item = (Cursor, E, T)> + Send + Sync + 'a>>,
}
impl<'a, T, E> StreamDataSource<'a, T, E>
where
T: Send + Sync + 'a,
E: ObjectType + Send + Sync + 'a,
{
/// Creates a StreamDataSource from an `Iterator` of edges.
pub fn from_iter<I>(iter: I) -> Self
where
I: Iterator<Item = (Cursor, E, T)> + Send + Sync + 'a,
{
futures::stream::iter(iter).into()
}
}
impl<'a, S: Stream, T, E> From<S> for StreamDataSource<'a, T, E>
where
S: Stream<Item = (Cursor, E, T)> + Send + Sync + 'a,
T: Send + Sync + 'a,
E: ObjectType + Send + Sync + 'a,
{
fn from(stream: S) -> Self {
StreamDataSource {
stream: Box::pin(stream),
}
}
}
struct State<T, E>
where
T: Send + Sync,
E: ObjectType + Send + Sync,
{
has_seen_before: bool,
has_prev_page: bool,
has_next_page: bool,
edges: VecDeque<(Cursor, E, T)>,
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl<'a, T, E> DataSource for StreamDataSource<'a, T, E> impl<'a, T, E> DataSource for BoxStream<'a, (Cursor, E, T)>
where where
T: Send + Sync + 'a, T: Send + 'a,
E: ObjectType + Send + Sync + 'a, E: ObjectType + Send + 'a,
{ {
type Element = T; type Element = T;
type EdgeFieldsObj = E; type EdgeFieldsObj = E;
@ -117,7 +95,7 @@ where
_ctx: &Context<'_>, _ctx: &Context<'_>,
operation: &QueryOperation, operation: &QueryOperation,
) -> FieldResult<Connection<Self::Element, Self::EdgeFieldsObj>> { ) -> FieldResult<Connection<Self::Element, Self::EdgeFieldsObj>> {
let stream = std::mem::replace(&mut self.stream, Box::pin(futures::stream::empty())); let stream = std::mem::replace(self, futures::stream::empty().boxed());
let state = State::<Self::Element, Self::EdgeFieldsObj> { let state = State::<Self::Element, Self::EdgeFieldsObj> {
has_seen_before: false, has_seen_before: false,

View File

@ -9,9 +9,7 @@ mod query_root;
mod streamed; mod streamed;
mod upload; mod upload;
pub use connection::{ pub use connection::{Connection, Cursor, DataSource, EmptyEdgeFields, PageInfo, QueryOperation};
Connection, Cursor, DataSource, EmptyEdgeFields, PageInfo, QueryOperation, StreamDataSource,
};
pub use deferred::Deferred; pub use deferred::Deferred;
pub use empty_mutation::EmptyMutation; pub use empty_mutation::EmptyMutation;
pub use empty_subscription::EmptySubscription; pub use empty_subscription::EmptySubscription;

View File

@ -15,7 +15,7 @@ pub mod stream {
first: Option<i32>, first: Option<i32>,
last: Option<i32>, last: Option<i32>,
) -> FieldResult<Connection<String>> { ) -> FieldResult<Connection<String>> {
let mut source: StreamDataSource<_> = futures::stream::iter(vec![ futures::stream::iter(vec![
"a".to_owned(), "a".to_owned(),
"b".to_owned(), "b".to_owned(),
"c".to_owned(), "c".to_owned(),
@ -27,9 +27,9 @@ pub mod stream {
let cursor: Cursor = node.clone().into(); let cursor: Cursor = node.clone().into();
(cursor, EmptyEdgeFields, node) (cursor, EmptyEdgeFields, node)
}) })
.into(); .boxed()
.query(ctx, after, before, first, last)
source.query(ctx, after, before, first, last).await .await
} }
} }