diff --git a/src/lib.rs b/src/lib.rs index 5a293be2..b7ef4923 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -154,7 +154,7 @@ pub use subscription::{ }; pub use types::{ Connection, Cursor, DataSource, Deferred, EmptyEdgeFields, EmptyMutation, EmptySubscription, - PageInfo, QueryOperation, StreamDataSource, Streamed, Upload, + PageInfo, QueryOperation, Streamed, Upload, }; pub use validation::ValidationMode; diff --git a/src/types/connection/connection_type.rs b/src/types/connection/connection_type.rs index 04338269..9ed4e32c 100644 --- a/src/types/connection/connection_type.rs +++ b/src/types/connection/connection_type.rs @@ -17,7 +17,7 @@ use std::borrow::Cow; /// If the `T` type is `OutputValueType`, you can return the value as a field function directly, /// otherwise you can use the `Connection::map` function to convert to a type that implements `OutputValueType`. /// `E` is an extension object type that extends the edge fields. -pub struct Connection { +pub struct Connection { /// The total number of records. pub total_count: Option, @@ -28,7 +28,7 @@ pub struct Connection { pub nodes: Vec<(Cursor, E, T)>, } -impl Connection { +impl Connection { /// Create a connection object. pub fn new( total_count: Option, diff --git a/src/types/connection/mod.rs b/src/types/connection/mod.rs index e1f50c5e..f75ba15d 100644 --- a/src/types/connection/mod.rs +++ b/src/types/connection/mod.rs @@ -10,7 +10,6 @@ use crate::{Context, FieldResult, ObjectType}; pub use connection_type::Connection; pub use cursor::Cursor; pub use page_info::PageInfo; -pub use stream::StreamDataSource; /// Connection query operation #[derive(Debug, Clone)] @@ -207,14 +206,14 @@ struct Pagination { /// } /// ``` #[async_trait::async_trait] -pub trait DataSource: Sync + Send { +pub trait DataSource: Send { /// Record type type Element; /// Fields for Edge /// /// Is a type that implements `ObjectType` and can be defined by the procedure macro `#[Object]`. - type EdgeFieldsObj: ObjectType + Send + Sync; + type EdgeFieldsObj: ObjectType + Send; /// Execute the query. async fn query( diff --git a/src/types/connection/stream.rs b/src/types/connection/stream.rs index a99d4558..7e42ee07 100644 --- a/src/types/connection/stream.rs +++ b/src/types/connection/stream.rs @@ -1,10 +1,20 @@ -use crate::types::connection::{EmptyEdgeFields, QueryOperation}; +use crate::types::connection::QueryOperation; use crate::{Connection, Context, Cursor, DataSource, FieldResult, ObjectType}; -use futures::{Stream, StreamExt}; +use futures::{stream::BoxStream, StreamExt}; use std::collections::VecDeque; -use std::pin::Pin; -/// Utility struct to convert a Stream to a Connection +struct State +where + T: Send, + E: ObjectType + Send, +{ + has_seen_before: bool, + has_prev_page: bool, + has_next_page: bool, + edges: VecDeque<(Cursor, E, T)>, +} + +/// You can use a Pin>> as a datasource /// /// # Examples /// @@ -23,14 +33,14 @@ use std::pin::Pin; /// first: Option, /// last: Option /// ) -> FieldResult> { -/// let edges_stream = futures::stream::iter(vec!["a", "b", "c", "d", "e", "f"]) +/// let mut edges_stream = futures::stream::iter(vec!["a", "b", "c", "d", "e", "f"]) /// .map(|node| { /// let cursor: Cursor = node.to_owned().into(); /// (cursor, EmptyEdgeFields, node) -/// }); +/// }) +/// .boxed(); /// -/// let mut source: StreamDataSource<_> = edges_stream.into(); -/// source.query(ctx, after, before, first, last).await +/// edges_stream.query(ctx, after, before, first, last).await /// } /// } /// @@ -38,76 +48,44 @@ use std::pin::Pin; /// async fn main() { /// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); /// -/// assert_eq!(schema.execute("{ streamConnection(first: 2) { edges { node } } }").await.unwrap().data, serde_json::json!({ -/// "streamConnection": { -/// "edges": [ -/// { "node": "a" }, -/// { "node": "b" } -/// ] -/// }, -/// })); +/// assert_eq!( +/// schema +/// .execute("{ streamConnection(first: 2) { edges { node } } }") +/// .await +/// .unwrap() +/// .data, +/// serde_json::json!({ +/// "streamConnection": { +/// "edges": [ +/// { "node": "a" }, +/// { "node": "b" } +/// ] +/// }, +/// }) +/// ); /// -/// assert_eq!(schema.execute("{ streamConnection(last: 2) { edges { node } } }").await.unwrap().data, serde_json::json!({ -/// "streamConnection": { -/// "edges": [ -/// { "node": "e" }, -/// { "node": "f" } -/// ] -/// }, -/// })); +/// assert_eq!( +/// schema +/// .execute("{ streamConnection(last: 2) { edges { node } } }") +/// .await +/// .unwrap() +/// .data, +/// serde_json::json!({ +/// "streamConnection": { +/// "edges": [ +/// { "node": "e" }, +/// { "node": "f" } +/// ] +/// }, +/// }) +/// ); /// } /// ``` -pub struct StreamDataSource<'a, T, E = EmptyEdgeFields> -where - T: Send + Sync + 'a, - E: ObjectType + Send + Sync + 'a, -{ - stream: Pin + Send + Sync + 'a>>, -} - -impl<'a, T, E> StreamDataSource<'a, T, E> -where - T: Send + Sync + 'a, - E: ObjectType + Send + Sync + 'a, -{ - /// Creates a StreamDataSource from an `Iterator` of edges. - pub fn from_iter(iter: I) -> Self - where - I: Iterator + Send + Sync + 'a, - { - futures::stream::iter(iter).into() - } -} - -impl<'a, S: Stream, T, E> From for StreamDataSource<'a, T, E> -where - S: Stream + Send + Sync + 'a, - T: Send + Sync + 'a, - E: ObjectType + Send + Sync + 'a, -{ - fn from(stream: S) -> Self { - StreamDataSource { - stream: Box::pin(stream), - } - } -} - -struct State -where - T: Send + Sync, - E: ObjectType + Send + Sync, -{ - has_seen_before: bool, - has_prev_page: bool, - has_next_page: bool, - edges: VecDeque<(Cursor, E, T)>, -} - #[async_trait::async_trait] -impl<'a, T, E> DataSource for StreamDataSource<'a, T, E> +impl<'a, T, E> DataSource for BoxStream<'a, (Cursor, E, T)> where - T: Send + Sync + 'a, - E: ObjectType + Send + Sync + 'a, + T: Send + 'a, + E: ObjectType + Send + 'a, { type Element = T; type EdgeFieldsObj = E; @@ -117,7 +95,7 @@ where _ctx: &Context<'_>, operation: &QueryOperation, ) -> FieldResult> { - let stream = std::mem::replace(&mut self.stream, Box::pin(futures::stream::empty())); + let stream = std::mem::replace(self, futures::stream::empty().boxed()); let state = State:: { has_seen_before: false, diff --git a/src/types/mod.rs b/src/types/mod.rs index c2af34fd..6be58540 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -9,9 +9,7 @@ mod query_root; mod streamed; mod upload; -pub use connection::{ - Connection, Cursor, DataSource, EmptyEdgeFields, PageInfo, QueryOperation, StreamDataSource, -}; +pub use connection::{Connection, Cursor, DataSource, EmptyEdgeFields, PageInfo, QueryOperation}; pub use deferred::Deferred; pub use empty_mutation::EmptyMutation; pub use empty_subscription::EmptySubscription; diff --git a/tests/connection.rs b/tests/connection.rs index e9e2ae14..b7e15fe9 100644 --- a/tests/connection.rs +++ b/tests/connection.rs @@ -15,7 +15,7 @@ pub mod stream { first: Option, last: Option, ) -> FieldResult> { - let mut source: StreamDataSource<_> = futures::stream::iter(vec![ + futures::stream::iter(vec![ "a".to_owned(), "b".to_owned(), "c".to_owned(), @@ -27,9 +27,9 @@ pub mod stream { let cursor: Cursor = node.clone().into(); (cursor, EmptyEdgeFields, node) }) - .into(); - - source.query(ctx, after, before, first, last).await + .boxed() + .query(ctx, after, before, first, last) + .await } }