Remove StreamDataSource mapping closure

This commit is contained in:
Samuel Hurel 2020-05-21 09:55:40 +02:00
parent cc5bfa8350
commit 2e3cea7b74
2 changed files with 35 additions and 22 deletions

View File

@ -1,23 +1,25 @@
use crate::types::connection::{EmptyEdgeFields, QueryOperation};
use crate::{Connection, Context, Cursor, DataSource, FieldResult};
use futures::{Future, FutureExt, Stream, StreamExt};
use crate::{Connection, Context, Cursor, DataSource, FieldResult, ObjectType};
use futures::{Stream, StreamExt};
use std::pin::Pin;
pub struct StreamDataSource<'a, T: Send + Sync + 'a> {
stream: Pin<Box<dyn Stream<Item = (Cursor, EmptyEdgeFields, T)> + Send + Sync + 'a>>,
pub struct StreamDataSource<'a, T, E = EmptyEdgeFields>
where
T: Send + Sync + 'a,
E: ObjectType + Send + Sync + 'a,
{
stream: Pin<Box<dyn Stream<Item = (Cursor, E, T)> + Send + Sync + 'a>>,
}
impl<'a, T: Send + Sync> StreamDataSource<'a, T> {
pub fn new<S, F, Fut>(stream: S, mut f: F) -> Self
impl<'a, T, E> StreamDataSource<'a, T, E>
where
T: Send + Sync + 'a,
E: ObjectType + Send + Sync + 'a,
{
pub fn new<S>(stream: S) -> Self
where
S: Stream<Item = T> + Send + Sync + 'a,
F: FnMut(&T) -> Fut + Send + Sync + 'a,
Fut: Future<Output = Cursor> + Send + Sync + 'a,
S: Stream<Item = (Cursor, E, T)> + Send + Sync + 'a,
{
let stream = stream.then(move |element: T| {
f(&element).map(move |cursor| (cursor, EmptyEdgeFields, element))
});
StreamDataSource {
stream: Box::pin(stream),
}
@ -25,17 +27,25 @@ impl<'a, T: Send + Sync> StreamDataSource<'a, T> {
}
use std::collections::VecDeque;
struct State<E: Send + Sync> {
struct State<T, E = EmptyEdgeFields>
where
T: Send + Sync,
E: ObjectType + Send + Sync,
{
has_seen_before: bool,
has_prev_page: bool,
has_next_page: bool,
edges: VecDeque<(Cursor, EmptyEdgeFields, E)>,
edges: VecDeque<(Cursor, E, T)>,
}
#[async_trait::async_trait]
impl<'a, T: Send + Sync + 'a> DataSource for StreamDataSource<'a, T> {
impl<'a, T, E> DataSource for StreamDataSource<'a, T, E>
where
T: Send + Sync + 'a,
E: ObjectType + Send + Sync + 'a,
{
type Element = T;
type EdgeFieldsObj = EmptyEdgeFields;
type EdgeFieldsObj = E;
async fn query_operation(
&mut self,
@ -44,7 +54,7 @@ impl<'a, T: Send + Sync + 'a> DataSource for StreamDataSource<'a, T> {
) -> FieldResult<Connection<Self::Element, Self::EdgeFieldsObj>> {
let stream = std::mem::replace(&mut self.stream, Box::pin(futures::stream::empty()));
let state = State::<Self::Element> {
let state = State::<Self::Element, Self::EdgeFieldsObj> {
has_seen_before: false,
has_prev_page: false,
has_next_page: false,

View File

@ -1,5 +1,6 @@
pub mod stream {
use async_graphql::*;
use futures::StreamExt;
use serde_json::json;
struct Root;
@ -21,11 +22,13 @@ pub mod stream {
"d".to_owned(),
"e".to_owned(),
"f".to_owned(),
]);
let mut source = StreamDataSource::new(stream, |s: &String| {
futures::future::ready(s.clone().into())
])
.map(|node| {
let cursor: Cursor = node.clone().into();
(cursor, EmptyEdgeFields, node)
});
let mut source = StreamDataSource::new(stream);
source.query(ctx, after, before, first, last).await
}
}