Replace futures with futures_util

This commit is contained in:
Koxiaet 2020-10-16 07:49:22 +01:00
parent 1b38ec46e0
commit db312a372d
26 changed files with 95 additions and 81 deletions

View File

@ -14,10 +14,23 @@ categories = ["network-programming", "asynchronous"]
readme = "README.md" readme = "README.md"
[features] [features]
default = ["apollo_tracing", "apollo_persisted_queries", "uuid", "bson", "chrono", "chrono-tz", "log", "multipart", "tracing", "url", "unblock", "string_number"] default = [
"apollo_persisted_queries",
"apollo_tracing",
"bson",
"chrono",
"chrono-tz",
"log",
"multipart",
"string_number",
"tracing",
"unblock",
"url",
"uuid",
]
apollo_tracing = ["chrono"] apollo_tracing = ["chrono"]
apollo_persisted_queries = ["lru"] apollo_persisted_queries = ["async-mutex", "lru"]
multipart = ["multer", "bytes", "tempfile"] multipart = ["bytes", "multer", "tempfile"]
unblock = ["blocking"] unblock = ["blocking"]
string_number = ["num-traits"] string_number = ["num-traits"]
# Used for doc(cfg()) # Used for doc(cfg())
@ -31,7 +44,7 @@ async-graphql-parser = { path = "parser", version = "=2.0.3" }
async-stream = "0.3" async-stream = "0.3"
async-trait = "0.1.41" async-trait = "0.1.41"
fnv = "1.0.6" fnv = "1.0.6"
futures = "0.3.6" futures-util = { version = "0.3.6", default-features = false, features = ["io"] }
indexmap = "1.6.0" indexmap = "1.6.0"
once_cell = "1.3.1" once_cell = "1.3.1"
pin-project-lite = "0.1.10" pin-project-lite = "0.1.10"
@ -43,23 +56,26 @@ thiserror = "1.0.21"
static_assertions = "1.1.0" static_assertions = "1.1.0"
# Feature optional dependencies # Feature optional dependencies
uuid = { version = "0.8.1", optional = true, features = ["v4", "serde"] }
bson = { version = "1.0.0", optional = true } bson = { version = "1.0.0", optional = true }
chrono = { version = "0.4.15", optional = true } chrono = { version = "0.4.15", optional = true }
chrono-tz = { version = "0.5.1", optional = true } chrono-tz = { version = "0.5.1", optional = true }
log = { version = "0.4.11", optional = true } log = { version = "0.4.11", optional = true }
tracing = { version = "0.1.21", optional = true } tracing = { version = "0.1.21", optional = true }
url = { version = "2.1.1", optional = true } url = { version = "2.1.1", optional = true }
num-traits = { version = "0.2.12", optional = true } uuid = { version = "0.8.1", optional = true, features = ["v4", "serde"] }
lru = { version = "0.6.0", optional = true }
bytes = { version = "0.5.4", optional = true } # Non-feature optional dependencies
multer = { version = "1.2.2", optional = true } async-mutex = { version = "1.4.0", optional = true }
tempfile = { version = "3.1.0", optional = true }
blocking = { version = "1.0.0", optional = true } blocking = { version = "1.0.0", optional = true }
bytes = { version = "0.5.4", optional = true }
lru = { version = "0.6.0", optional = true }
multer = { version = "1.2.2", optional = true }
num-traits = { version = "0.2.12", optional = true }
tempfile = { version = "3.1.0", optional = true }
[dev-dependencies] [dev-dependencies]
async-std = { version = "1.6.5", features = ["attributes"] } async-std = { version = "1.6.5", features = ["attributes"] }
async-channel = "1.5.1"
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["nightly"] features = ["nightly"]

View File

@ -81,7 +81,7 @@ pub fn generate(object_args: &args::MergedSubscription) -> GeneratorResult<Token
fn create_field_stream<'__life>( fn create_field_stream<'__life>(
&'__life self, &'__life self,
ctx: &'__life #crate_name::Context<'__life> ctx: &'__life #crate_name::Context<'__life>
) -> Option<::std::pin::Pin<::std::boxed::Box<dyn #crate_name::futures::Stream<Item = #crate_name::ServerResult<#crate_name::Value>> + ::std::marker::Send + '__life>>> { ) -> Option<::std::pin::Pin<::std::boxed::Box<dyn #crate_name::futures_util::stream::Stream<Item = #crate_name::ServerResult<#crate_name::Value>> + ::std::marker::Send + '__life>>> {
None #create_field_stream None #create_field_stream
} }
} }

View File

@ -247,7 +247,7 @@ pub fn generate(
#(#schema_args)* #(#schema_args)*
args args
}, },
ty: <<#stream_ty as #crate_name::futures::stream::Stream>::Item as #crate_name::Type>::create_type_info(registry), ty: <<#stream_ty as #crate_name::futures_util::stream::Stream>::Item as #crate_name::Type>::create_type_info(registry),
deprecation: #field_deprecation, deprecation: #field_deprecation,
cache_control: Default::default(), cache_control: Default::default(),
external: false, external: false,
@ -281,7 +281,7 @@ pub fn generate(
let pos = ctx.item.pos; let pos = ctx.item.pos;
let schema_env = ctx.schema_env.clone(); let schema_env = ctx.schema_env.clone();
let query_env = ctx.query_env.clone(); let query_env = ctx.query_env.clone();
let stream = #crate_name::futures::StreamExt::then(#create_field_stream, { let stream = #crate_name::futures_util::stream::StreamExt::then(#create_field_stream, {
let field_name = field_name.clone(); let field_name = field_name.clone();
move |msg| { move |msg| {
let schema_env = schema_env.clone(); let schema_env = schema_env.clone();
@ -316,7 +316,7 @@ pub fn generate(
resolve_id, resolve_id,
path_node: ctx_selection_set.path_node.as_ref().unwrap(), path_node: ctx_selection_set.path_node.as_ref().unwrap(),
parent_type: #gql_typename, parent_type: #gql_typename,
return_type: &<<#stream_ty as #crate_name::futures::stream::Stream>::Item as #crate_name::Type>::qualified_type_name(), return_type: &<<#stream_ty as #crate_name::futures_util::stream::Stream>::Item as #crate_name::Type>::qualified_type_name(),
}; };
query_env.extensions.resolve_start(&ctx_extension, &ri); query_env.extensions.resolve_start(&ctx_extension, &ri);
@ -330,17 +330,17 @@ pub fn generate(
} }
} }
}); });
#crate_name::ServerResult::Ok(#crate_name::futures::StreamExt::scan( #crate_name::ServerResult::Ok(#crate_name::futures_util::stream::StreamExt::scan(
stream, stream,
false, false,
|errored, item| { |errored, item| {
if *errored { if *errored {
return #crate_name::futures::future::ready(None); return #crate_name::futures_util::future::ready(None);
} }
if item.is_err() { if item.is_err() {
*errored = true; *errored = true;
} }
#crate_name::futures::future::ready(Some(item)) #crate_name::futures_util::future::ready(Some(item))
}, },
)) ))
}; };
@ -349,8 +349,8 @@ pub fn generate(
#(#cfg_attrs)* #(#cfg_attrs)*
if ctx.item.node.name.node == #field_name { if ctx.item.node.name.node == #field_name {
return ::std::option::Option::Some(::std::boxed::Box::pin( return ::std::option::Option::Some(::std::boxed::Box::pin(
#crate_name::futures::TryStreamExt::try_flatten( #crate_name::futures_util::stream::TryStreamExt::try_flatten(
#crate_name::futures::stream::once((move || async move { #stream_fn })()) #crate_name::futures_util::stream::once((move || async move { #stream_fn })())
) )
)); ));
} }
@ -392,7 +392,7 @@ pub fn generate(
fn create_field_stream<'__life>( fn create_field_stream<'__life>(
&'__life self, &'__life self,
ctx: &'__life #crate_name::Context<'__life>, ctx: &'__life #crate_name::Context<'__life>,
) -> ::std::option::Option<::std::pin::Pin<::std::boxed::Box<dyn #crate_name::futures::Stream<Item = #crate_name::ServerResult<#crate_name::Value>> + Send + '__life>>> { ) -> ::std::option::Option<::std::pin::Pin<::std::boxed::Box<dyn #crate_name::futures_util::stream::Stream<Item = #crate_name::ServerResult<#crate_name::Value>> + Send + '__life>>> {
#(#create_stream)* #(#create_stream)*
None None
} }

View File

@ -11,7 +11,7 @@ use warp::{Filter, Rejection, Reply};
/// use async_graphql::*; /// use async_graphql::*;
/// use async_graphql_warp::*; /// use async_graphql_warp::*;
/// use warp::Filter; /// use warp::Filter;
/// use futures::{Stream, StreamExt}; /// use futures_util::stream::{Stream, StreamExt};
/// use std::time::Duration; /// use std::time::Duration;
/// ///
/// struct QueryRoot; /// struct QueryRoot;

View File

@ -299,7 +299,7 @@ impl Display for ResolveId {
/// **This type is not stable and should not be used directly.** /// **This type is not stable and should not be used directly.**
#[derive(Clone)] #[derive(Clone)]
pub struct ContextBase<'a, T> { pub struct ContextBase<'a, T> {
#[allow(missing_docs)] /// The current path node being resolved.
pub path_node: Option<QueryPathNode<'a>>, pub path_node: Option<QueryPathNode<'a>>,
pub(crate) resolve_id: ResolveId, pub(crate) resolve_id: ResolveId,
pub(crate) inc_resolve_id: &'a AtomicUsize, pub(crate) inc_resolve_id: &'a AtomicUsize,

View File

@ -2,7 +2,7 @@
use std::sync::Arc; use std::sync::Arc;
use futures::lock::Mutex; use async_mutex::Mutex;
use serde::Deserialize; use serde::Deserialize;
use crate::extensions::{Extension, ExtensionContext, ExtensionFactory}; use crate::extensions::{Extension, ExtensionContext, ExtensionFactory};

View File

@ -6,8 +6,7 @@ mod multipart;
mod playground_source; mod playground_source;
mod websocket; mod websocket;
use futures::io::AsyncRead; use futures_util::io::{AsyncRead, AsyncReadExt};
use futures::AsyncReadExt;
use crate::{BatchRequest, ParseRequestError, Request}; use crate::{BatchRequest, ParseRequestError, Request};
@ -59,7 +58,7 @@ pub async fn receive_json(body: impl AsyncRead) -> Result<Request, ParseRequestE
/// Receive a GraphQL batch request from a body as JSON. /// Receive a GraphQL batch request from a body as JSON.
pub async fn receive_batch_json(body: impl AsyncRead) -> Result<BatchRequest, ParseRequestError> { pub async fn receive_batch_json(body: impl AsyncRead) -> Result<BatchRequest, ParseRequestError> {
let mut data = Vec::new(); let mut data = Vec::new();
futures::pin_mut!(body); futures_util::pin_mut!(body);
body.read_to_end(&mut data) body.read_to_end(&mut data)
.await .await
.map_err(ParseRequestError::Io)?; .map_err(ParseRequestError::Io)?;

View File

@ -4,8 +4,8 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use bytes::Bytes; use bytes::Bytes;
use futures::io::AsyncRead; use futures_util::io::AsyncRead;
use futures::stream::Stream; use futures_util::stream::Stream;
use multer::{Constraints, Multipart, SizeLimit}; use multer::{Constraints, Multipart, SizeLimit};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -161,7 +161,7 @@ impl<T: AsyncRead> Stream for ReaderStream<T> {
let this = self.project(); let this = self.project();
Poll::Ready( Poll::Ready(
match futures::ready!(this.reader.poll_read(cx, this.buf)?) { match futures_util::ready!(this.reader.poll_read(cx, this.buf)?) {
0 => None, 0 => None,
size => Some(Ok(Bytes::copy_from_slice(&this.buf[..size]))), size => Some(Ok(Bytes::copy_from_slice(&this.buf[..size]))),
}, },

View File

@ -5,7 +5,7 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::Stream; use futures_util::stream::Stream;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View File

@ -147,7 +147,7 @@ pub use async_trait;
#[doc(hidden)] #[doc(hidden)]
pub use context::ContextSelectionSet; pub use context::ContextSelectionSet;
#[doc(hidden)] #[doc(hidden)]
pub use futures; pub use futures_util;
#[doc(hidden)] #[doc(hidden)]
pub use indexmap; pub use indexmap;
#[doc(hidden)] #[doc(hidden)]
@ -712,7 +712,7 @@ pub use async_graphql_derive::Union;
/// ///
/// ```rust /// ```rust
/// use async_graphql::*; /// use async_graphql::*;
/// use futures::{Stream, StreamExt}; /// use futures_util::stream::{Stream, StreamExt};
/// ///
/// struct SubscriptionRoot; /// struct SubscriptionRoot;
/// ///
@ -720,7 +720,7 @@ pub use async_graphql_derive::Union;
/// impl SubscriptionRoot { /// impl SubscriptionRoot {
/// async fn value(&self, condition: i32) -> impl Stream<Item = i32> { /// async fn value(&self, condition: i32) -> impl Stream<Item = i32> {
/// // Returns the number from 0 to `condition`. /// // Returns the number from 0 to `condition`.
/// futures::stream::iter(0..condition) /// futures_util::stream::iter(0..condition)
/// } /// }
/// } /// }
/// ``` /// ```
@ -789,7 +789,7 @@ pub use async_graphql_derive::MergedObject;
/// ///
/// ```rust /// ```rust
/// use async_graphql::*; /// use async_graphql::*;
/// use futures::Stream; /// use futures_util::stream::Stream;
/// ///
/// #[derive(Default)] /// #[derive(Default)]
/// struct Subscription1; /// struct Subscription1;
@ -797,7 +797,7 @@ pub use async_graphql_derive::MergedObject;
/// #[Subscription] /// #[Subscription]
/// impl Subscription1 { /// impl Subscription1 {
/// async fn events1(&self) -> impl Stream<Item = i32> { /// async fn events1(&self) -> impl Stream<Item = i32> {
/// futures::stream::iter(0..10) /// futures_util::stream::iter(0..10)
/// } /// }
/// } /// }
/// ///
@ -807,7 +807,7 @@ pub use async_graphql_derive::MergedObject;
/// #[Subscription] /// #[Subscription]
/// impl Subscription2 { /// impl Subscription2 {
/// async fn events2(&self) -> impl Stream<Item = i32> { /// async fn events2(&self) -> impl Stream<Item = i32> {
/// futures::stream::iter(10..20) /// futures_util::stream::iter(10..20)
/// } /// }
/// } /// }
/// ///

View File

@ -82,7 +82,7 @@ async fn resolve_container_inner<'a, T: ContainerType + Send + Sync>(
fields.add_set(ctx, root)?; fields.add_set(ctx, root)?;
let res = if parallel { let res = if parallel {
futures::future::try_join_all(fields.0).await? futures_util::future::try_join_all(fields.0).await?
} else { } else {
let mut results = Vec::with_capacity(fields.0.len()); let mut results = Vec::with_capacity(fields.0.len());
for field in fields.0 { for field in fields.0 {

View File

@ -54,5 +54,5 @@ pub async fn resolve_list<'a, T: OutputValueType + Send + Sync + 'a>(
}); });
} }
Ok(Value::List(futures::future::try_join_all(futures).await?)) Ok(Value::List(futures_util::future::try_join_all(futures).await?))
} }

View File

@ -4,7 +4,7 @@ use std::ops::Deref;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
use futures::stream::{self, Stream, StreamExt}; use futures_util::stream::{self, Stream, StreamExt};
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
use crate::context::{Data, QueryEnvInner, ResolveId}; use crate::context::{Data, QueryEnvInner, ResolveId};
@ -487,7 +487,7 @@ where
match batch_request { match batch_request {
BatchRequest::Single(request) => BatchResponse::Single(self.execute(request).await), BatchRequest::Single(request) => BatchResponse::Single(self.execute(request).await),
BatchRequest::Batch(requests) => BatchResponse::Batch( BatchRequest::Batch(requests) => BatchResponse::Batch(
futures::stream::iter(requests.into_iter()) futures_util::stream::iter(requests.into_iter())
.then(|request| self.execute(request)) .then(|request| self.execute(request))
.collect() .collect()
.await, .await,

View File

@ -1,6 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use futures::{Stream, StreamExt}; use futures_util::stream::{Stream, StreamExt};
use crate::parser::types::{Selection, TypeCondition}; use crate::parser::types::{Selection, TypeCondition};
use crate::{ use crate::{

View File

@ -1,6 +1,6 @@
use std::borrow::Cow; use std::borrow::Cow;
use futures::{Stream, StreamExt, TryStreamExt}; use futures_util::stream::{Stream, StreamExt, TryStreamExt};
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
use crate::connection::edge::Edge; use crate::connection::edge::Edge;

View File

@ -1,7 +1,7 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::pin::Pin; use std::pin::Pin;
use futures::{stream, Stream}; use futures_util::stream::{self, Stream};
use crate::{registry, Context, ServerError, ServerResult, SubscriptionType, Type, Value}; use crate::{registry, Context, ServerError, ServerResult, SubscriptionType, Type, Value};

View File

@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::fs::File; use std::fs::File;
use std::io::Read; use std::io::Read;
use futures::AsyncRead; use futures_util::io::AsyncRead;
use crate::{registry, Context, InputValueError, InputValueResult, InputValueType, Type, Value}; use crate::{registry, Context, InputValueError, InputValueResult, InputValueType, Type, Value};

View File

@ -46,7 +46,7 @@ where
for (i, s) in selected.iter().enumerate() { for (i, s) in selected.iter().enumerate() {
if i != 0 { if i != 0 {
suggestion.push_str(" , "); suggestion.push_str(", ");
} }
write!(suggestion, "\"{}\"", s).unwrap(); write!(suggestion, "\"{}\"", s).unwrap();
} }

View File

@ -1,7 +1,7 @@
#![allow(dead_code)] #![allow(dead_code)]
use async_graphql::*; use async_graphql::*;
use futures::{Stream, StreamExt}; use futures_util::stream::{Stream, StreamExt};
#[async_std::test] #[async_std::test]
pub async fn test_field_features() { pub async fn test_field_features() {
@ -19,19 +19,19 @@ pub async fn test_field_features() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn values(&self) -> impl Stream<Item = i32> { async fn values(&self) -> impl Stream<Item = i32> {
futures::stream::once(async move { 10 }) futures_util::stream::once(async move { 10 })
} }
#[cfg(feature = "bson")] #[cfg(feature = "bson")]
async fn values_bson(&self) -> impl Stream<Item = i32> { async fn values_bson(&self) -> impl Stream<Item = i32> {
futures::stream::once(async move { 10 }) futures_util::stream::once(async move { 10 })
} }
#[cfg(feature = "abc")] #[cfg(feature = "abc")]
async fn values_abc( async fn values_abc(
&self, &self,
) -> Pin<Box<dyn async_graphql::futures::Stream<Item = i32> + Send + 'static>> { ) -> Pin<Box<dyn Stream<Item = i32> + Send + 'static>> {
Box::pin(futures::stream::once(async move { 10 })) Box::pin(futures_util::stream::once(async move { 10 }))
} }
} }

View File

@ -1,6 +1,6 @@
use async_graphql::guard::Guard; use async_graphql::guard::Guard;
use async_graphql::*; use async_graphql::*;
use futures::{Stream, StreamExt}; use futures_util::stream::{Stream, StreamExt};
#[derive(Eq, PartialEq, Copy, Clone)] #[derive(Eq, PartialEq, Copy, Clone)]
enum Role { enum Role {
@ -71,7 +71,7 @@ pub async fn test_guard_simple_rule() {
impl Subscription { impl Subscription {
#[graphql(guard(RoleGuard(role = "Role::Admin")))] #[graphql(guard(RoleGuard(role = "Role::Admin")))]
async fn values(&self) -> impl Stream<Item = i32> { async fn values(&self) -> impl Stream<Item = i32> {
futures::stream::iter(vec![1, 2, 3]) futures_util::stream::iter(vec![1, 2, 3])
} }
} }

View File

@ -1,5 +1,5 @@
use async_graphql::*; use async_graphql::*;
use futures::{Stream, StreamExt}; use futures_util::stream::{Stream, StreamExt};
#[derive(SimpleObject)] #[derive(SimpleObject)]
struct Object1 { struct Object1 {
@ -165,7 +165,7 @@ pub async fn test_merged_subscription() {
#[Subscription] #[Subscription]
impl Subscription1 { impl Subscription1 {
async fn events1(&self) -> impl Stream<Item = i32> { async fn events1(&self) -> impl Stream<Item = i32> {
futures::stream::iter(0..10) futures_util::stream::iter(0..10)
} }
} }
@ -175,7 +175,7 @@ pub async fn test_merged_subscription() {
#[Subscription] #[Subscription]
impl Subscription2 { impl Subscription2 {
async fn events2(&self) -> impl Stream<Item = i32> { async fn events2(&self) -> impl Stream<Item = i32> {
futures::stream::iter(10..20) futures_util::stream::iter(10..20)
} }
} }

View File

@ -1,5 +1,5 @@
use async_graphql::*; use async_graphql::*;
use futures::lock::Mutex; use async_std::sync::Mutex;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;

View File

@ -1,5 +1,5 @@
use async_graphql::*; use async_graphql::*;
use futures::{Stream, StreamExt, TryStreamExt}; use futures_util::stream::{Stream, StreamExt, TryStreamExt};
#[async_std::test] #[async_std::test]
pub async fn test_input_value_custom_error() { pub async fn test_input_value_custom_error() {
@ -41,7 +41,7 @@ pub async fn test_input_value_custom_error() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn r#type(&self) -> impl Stream<Item = i32> { async fn r#type(&self) -> impl Stream<Item = i32> {
futures::stream::iter(0..10) futures_util::stream::iter(0..10)
} }
} }

View File

@ -1,5 +1,5 @@
use async_graphql::*; use async_graphql::*;
use futures::{Stream, StreamExt}; use futures_util::stream::{Stream, StreamExt};
#[async_std::test] #[async_std::test]
pub async fn test_enum() { pub async fn test_enum() {
@ -114,7 +114,7 @@ pub async fn test_subscription() {
#[allow(non_snake_case)] #[allow(non_snake_case)]
impl Subscription { impl Subscription {
async fn create_object(&self, ObjectId: i32) -> impl Stream<Item = i32> { async fn create_object(&self, ObjectId: i32) -> impl Stream<Item = i32> {
futures::stream::once(async move { ObjectId }) futures_util::stream::once(async move { ObjectId })
} }
} }

View File

@ -1,5 +1,5 @@
use async_graphql::*; use async_graphql::*;
use futures::{Stream, StreamExt, TryStreamExt}; use futures_util::stream::{Stream, StreamExt, TryStreamExt};
#[async_std::test] #[async_std::test]
pub async fn test_subscription() { pub async fn test_subscription() {
@ -19,11 +19,11 @@ pub async fn test_subscription() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn values(&self, start: i32, end: i32) -> impl Stream<Item = i32> { async fn values(&self, start: i32, end: i32) -> impl Stream<Item = i32> {
futures::stream::iter(start..end) futures_util::stream::iter(start..end)
} }
async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> { async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> {
futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) futures_util::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 }))
} }
} }
@ -77,11 +77,11 @@ pub async fn test_subscription_with_ctx_data() {
impl SubscriptionRoot { impl SubscriptionRoot {
async fn values(&self, ctx: &Context<'_>) -> impl Stream<Item = i32> { async fn values(&self, ctx: &Context<'_>) -> impl Stream<Item = i32> {
let value = *ctx.data_unchecked::<i32>(); let value = *ctx.data_unchecked::<i32>();
futures::stream::once(async move { value }) futures_util::stream::once(async move { value })
} }
async fn objects(&self) -> impl Stream<Item = MyObject> { async fn objects(&self) -> impl Stream<Item = MyObject> {
futures::stream::once(async move { MyObject }) futures_util::stream::once(async move { MyObject })
} }
} }
@ -118,7 +118,7 @@ pub async fn test_subscription_with_token() {
if ctx.data_unchecked::<Token>().0 != "123456" { if ctx.data_unchecked::<Token>().0 != "123456" {
return Err("forbidden".into()); return Err("forbidden".into());
} }
Ok(futures::stream::once(async move { 100 })) Ok(futures_util::stream::once(async move { 100 }))
} }
} }
@ -166,7 +166,7 @@ pub async fn test_subscription_inline_fragment() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> { async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> {
futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) futures_util::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 }))
} }
} }
@ -219,7 +219,7 @@ pub async fn test_subscription_fragment() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> { async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> {
futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) futures_util::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 }))
} }
} }
@ -274,7 +274,7 @@ pub async fn test_subscription_fragment2() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> { async fn events(&self, start: i32, end: i32) -> impl Stream<Item = Event> {
futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) futures_util::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 }))
} }
} }
@ -333,7 +333,7 @@ pub async fn test_subscription_error() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn events(&self) -> impl Stream<Item = Event> { async fn events(&self) -> impl Stream<Item = Event> {
futures::stream::iter((0..10).map(|n| Event { value: n })) futures_util::stream::iter((0..10).map(|n| Event { value: n }))
} }
} }
@ -380,9 +380,9 @@ pub async fn test_subscription_fieldresult() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn values(&self) -> impl Stream<Item = Result<i32>> { async fn values(&self) -> impl Stream<Item = Result<i32>> {
futures::stream::iter(0..5) futures_util::stream::iter(0..5)
.map(Result::Ok) .map(Result::Ok)
.chain(futures::stream::once( .chain(futures_util::stream::once(
async move { Err("StreamErr".into()) }, async move { Err("StreamErr".into()) },
)) ))
} }

View File

@ -1,6 +1,5 @@
use async_graphql::*; use async_graphql::*;
use futures::channel::mpsc; use futures_util::stream::{Stream, StreamExt};
use futures::{SinkExt, Stream, StreamExt};
#[async_std::test] #[async_std::test]
pub async fn test_subscription_ws_transport() { pub async fn test_subscription_ws_transport() {
@ -14,12 +13,12 @@ pub async fn test_subscription_ws_transport() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn values(&self) -> impl Stream<Item = i32> { async fn values(&self) -> impl Stream<Item = i32> {
futures::stream::iter(0..10) futures_util::stream::iter(0..10)
} }
} }
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let (mut tx, rx) = mpsc::unbounded(); let (tx, rx) = async_channel::unbounded();
let mut stream = http::WebSocket::new(schema, rx); let mut stream = http::WebSocket::new(schema, rx);
tx.send( tx.send(
@ -88,12 +87,12 @@ pub async fn test_subscription_ws_transport_with_token() {
if ctx.data_unchecked::<Token>().0 != "123456" { if ctx.data_unchecked::<Token>().0 != "123456" {
return Err("forbidden".into()); return Err("forbidden".into());
} }
Ok(futures::stream::iter(0..10)) Ok(futures_util::stream::iter(0..10))
} }
} }
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let (mut tx, rx) = mpsc::unbounded(); let (tx, rx) = async_channel::unbounded();
let mut stream = http::WebSocket::with_data( let mut stream = http::WebSocket::with_data(
schema, schema,
rx, rx,
@ -187,12 +186,12 @@ pub async fn test_subscription_ws_transport_error() {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn events(&self) -> impl Stream<Item = Event> { async fn events(&self) -> impl Stream<Item = Event> {
futures::stream::iter((0..10).map(|n| Event { value: n })) futures_util::stream::iter((0..10).map(|n| Event { value: n }))
} }
} }
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let (mut tx, rx) = mpsc::unbounded(); let (tx, rx) = async_channel::unbounded();
let mut stream = http::WebSocket::new(schema, rx); let mut stream = http::WebSocket::new(schema, rx);
tx.send( tx.send(
@ -264,7 +263,7 @@ pub async fn test_query_over_websocket() {
} }
let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
let (mut tx, rx) = mpsc::unbounded(); let (tx, rx) = async_channel::unbounded();
let mut stream = http::WebSocket::new(schema, rx); let mut stream = http::WebSocket::new(schema, rx);
tx.send( tx.send(