From db312a372db9f90cec0f33a1b62f16b510a98976 Mon Sep 17 00:00:00 2001 From: Koxiaet <38139193+Koxiaet@users.noreply.github.com> Date: Fri, 16 Oct 2020 07:49:22 +0100 Subject: [PATCH] Replace futures with futures_util --- Cargo.toml | 36 ++++++++++++++++------ derive/src/merged_subscription.rs | 2 +- derive/src/subscription.rs | 18 +++++------ integrations/warp/src/subscription.rs | 2 +- src/context.rs | 2 +- src/extensions/apollo_persisted_queries.rs | 2 +- src/http/mod.rs | 5 ++- src/http/multipart.rs | 6 ++-- src/http/websocket.rs | 2 +- src/lib.rs | 12 ++++---- src/resolver_utils/container.rs | 2 +- src/resolver_utils/list.rs | 2 +- src/schema.rs | 4 +-- src/subscription.rs | 2 +- src/types/connection/connection_type.rs | 2 +- src/types/empty_subscription.rs | 2 +- src/types/upload.rs | 2 +- src/validation/suggestion.rs | 2 +- tests/field_features.rs | 10 +++--- tests/guard.rs | 4 +-- tests/merged_object.rs | 6 ++-- tests/mutation.rs | 2 +- tests/raw_ident.rs | 4 +-- tests/rename.rs | 4 +-- tests/subscription.rs | 24 +++++++-------- tests/subscription_websocket.rs | 17 +++++----- 26 files changed, 95 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c461a5c2..50b749f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,10 +14,23 @@ categories = ["network-programming", "asynchronous"] readme = "README.md" [features] -default = ["apollo_tracing", "apollo_persisted_queries", "uuid", "bson", "chrono", "chrono-tz", "log", "multipart", "tracing", "url", "unblock", "string_number"] +default = [ + "apollo_persisted_queries", + "apollo_tracing", + "bson", + "chrono", + "chrono-tz", + "log", + "multipart", + "string_number", + "tracing", + "unblock", + "url", + "uuid", +] apollo_tracing = ["chrono"] -apollo_persisted_queries = ["lru"] -multipart = ["multer", "bytes", "tempfile"] +apollo_persisted_queries = ["async-mutex", "lru"] +multipart = ["bytes", "multer", "tempfile"] unblock = ["blocking"] string_number = ["num-traits"] # Used for doc(cfg()) @@ -31,7 +44,7 @@ async-graphql-parser = { path = "parser", version = "=2.0.3" } async-stream = "0.3" async-trait = "0.1.41" fnv = "1.0.6" -futures = "0.3.6" +futures-util = { version = "0.3.6", default-features = false, features = ["io"] } indexmap = "1.6.0" once_cell = "1.3.1" pin-project-lite = "0.1.10" @@ -43,23 +56,26 @@ thiserror = "1.0.21" static_assertions = "1.1.0" # Feature optional dependencies -uuid = { version = "0.8.1", optional = true, features = ["v4", "serde"] } bson = { version = "1.0.0", optional = true } chrono = { version = "0.4.15", optional = true } chrono-tz = { version = "0.5.1", optional = true } log = { version = "0.4.11", optional = true } tracing = { version = "0.1.21", optional = true } url = { version = "2.1.1", optional = true } -num-traits = { version = "0.2.12", optional = true } -lru = { version = "0.6.0", optional = true } +uuid = { version = "0.8.1", optional = true, features = ["v4", "serde"] } -bytes = { version = "0.5.4", optional = true } -multer = { version = "1.2.2", optional = true } -tempfile = { version = "3.1.0", optional = true } +# Non-feature optional dependencies +async-mutex = { version = "1.4.0", optional = true } blocking = { version = "1.0.0", optional = true } +bytes = { version = "0.5.4", optional = true } +lru = { version = "0.6.0", optional = true } +multer = { version = "1.2.2", optional = true } +num-traits = { version = "0.2.12", optional = true } +tempfile = { version = "3.1.0", optional = true } [dev-dependencies] async-std = { version = "1.6.5", features = ["attributes"] } +async-channel = "1.5.1" [package.metadata.docs.rs] features = ["nightly"] diff --git a/derive/src/merged_subscription.rs b/derive/src/merged_subscription.rs index 3a12b0b9..af3d82f5 100644 --- a/derive/src/merged_subscription.rs +++ b/derive/src/merged_subscription.rs @@ -81,7 +81,7 @@ pub fn generate(object_args: &args::MergedSubscription) -> GeneratorResult( &'__life self, ctx: &'__life #crate_name::Context<'__life> - ) -> Option<::std::pin::Pin<::std::boxed::Box> + ::std::marker::Send + '__life>>> { + ) -> Option<::std::pin::Pin<::std::boxed::Box> + ::std::marker::Send + '__life>>> { None #create_field_stream } } diff --git a/derive/src/subscription.rs b/derive/src/subscription.rs index 5e4c7930..1d8feed0 100644 --- a/derive/src/subscription.rs +++ b/derive/src/subscription.rs @@ -247,7 +247,7 @@ pub fn generate( #(#schema_args)* args }, - ty: <<#stream_ty as #crate_name::futures::stream::Stream>::Item as #crate_name::Type>::create_type_info(registry), + ty: <<#stream_ty as #crate_name::futures_util::stream::Stream>::Item as #crate_name::Type>::create_type_info(registry), deprecation: #field_deprecation, cache_control: Default::default(), external: false, @@ -281,7 +281,7 @@ pub fn generate( let pos = ctx.item.pos; let schema_env = ctx.schema_env.clone(); let query_env = ctx.query_env.clone(); - let stream = #crate_name::futures::StreamExt::then(#create_field_stream, { + let stream = #crate_name::futures_util::stream::StreamExt::then(#create_field_stream, { let field_name = field_name.clone(); move |msg| { let schema_env = schema_env.clone(); @@ -316,7 +316,7 @@ pub fn generate( resolve_id, path_node: ctx_selection_set.path_node.as_ref().unwrap(), parent_type: #gql_typename, - return_type: &<<#stream_ty as #crate_name::futures::stream::Stream>::Item as #crate_name::Type>::qualified_type_name(), + return_type: &<<#stream_ty as #crate_name::futures_util::stream::Stream>::Item as #crate_name::Type>::qualified_type_name(), }; query_env.extensions.resolve_start(&ctx_extension, &ri); @@ -330,17 +330,17 @@ pub fn generate( } } }); - #crate_name::ServerResult::Ok(#crate_name::futures::StreamExt::scan( + #crate_name::ServerResult::Ok(#crate_name::futures_util::stream::StreamExt::scan( stream, false, |errored, item| { if *errored { - return #crate_name::futures::future::ready(None); + return #crate_name::futures_util::future::ready(None); } if item.is_err() { *errored = true; } - #crate_name::futures::future::ready(Some(item)) + #crate_name::futures_util::future::ready(Some(item)) }, )) }; @@ -349,8 +349,8 @@ pub fn generate( #(#cfg_attrs)* if ctx.item.node.name.node == #field_name { return ::std::option::Option::Some(::std::boxed::Box::pin( - #crate_name::futures::TryStreamExt::try_flatten( - #crate_name::futures::stream::once((move || async move { #stream_fn })()) + #crate_name::futures_util::stream::TryStreamExt::try_flatten( + #crate_name::futures_util::stream::once((move || async move { #stream_fn })()) ) )); } @@ -392,7 +392,7 @@ pub fn generate( fn create_field_stream<'__life>( &'__life self, ctx: &'__life #crate_name::Context<'__life>, - ) -> ::std::option::Option<::std::pin::Pin<::std::boxed::Box> + Send + '__life>>> { + ) -> ::std::option::Option<::std::pin::Pin<::std::boxed::Box> + Send + '__life>>> { #(#create_stream)* None } diff --git a/integrations/warp/src/subscription.rs b/integrations/warp/src/subscription.rs index 25b7e74c..92490dee 100644 --- a/integrations/warp/src/subscription.rs +++ b/integrations/warp/src/subscription.rs @@ -11,7 +11,7 @@ use warp::{Filter, Rejection, Reply}; /// use async_graphql::*; /// use async_graphql_warp::*; /// use warp::Filter; -/// use futures::{Stream, StreamExt}; +/// use futures_util::stream::{Stream, StreamExt}; /// use std::time::Duration; /// /// struct QueryRoot; diff --git a/src/context.rs b/src/context.rs index 016631ec..d513f6d8 100644 --- a/src/context.rs +++ b/src/context.rs @@ -299,7 +299,7 @@ impl Display for ResolveId { /// **This type is not stable and should not be used directly.** #[derive(Clone)] pub struct ContextBase<'a, T> { - #[allow(missing_docs)] + /// The current path node being resolved. pub path_node: Option>, pub(crate) resolve_id: ResolveId, pub(crate) inc_resolve_id: &'a AtomicUsize, diff --git a/src/extensions/apollo_persisted_queries.rs b/src/extensions/apollo_persisted_queries.rs index 4087934e..6148de33 100644 --- a/src/extensions/apollo_persisted_queries.rs +++ b/src/extensions/apollo_persisted_queries.rs @@ -2,7 +2,7 @@ use std::sync::Arc; -use futures::lock::Mutex; +use async_mutex::Mutex; use serde::Deserialize; use crate::extensions::{Extension, ExtensionContext, ExtensionFactory}; diff --git a/src/http/mod.rs b/src/http/mod.rs index 18857721..312a61c0 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -6,8 +6,7 @@ mod multipart; mod playground_source; mod websocket; -use futures::io::AsyncRead; -use futures::AsyncReadExt; +use futures_util::io::{AsyncRead, AsyncReadExt}; use crate::{BatchRequest, ParseRequestError, Request}; @@ -59,7 +58,7 @@ pub async fn receive_json(body: impl AsyncRead) -> Result Result { let mut data = Vec::new(); - futures::pin_mut!(body); + futures_util::pin_mut!(body); body.read_to_end(&mut data) .await .map_err(ParseRequestError::Io)?; diff --git a/src/http/multipart.rs b/src/http/multipart.rs index 1b1a5278..22d34a0d 100644 --- a/src/http/multipart.rs +++ b/src/http/multipart.rs @@ -4,8 +4,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -use futures::io::AsyncRead; -use futures::stream::Stream; +use futures_util::io::AsyncRead; +use futures_util::stream::Stream; use multer::{Constraints, Multipart, SizeLimit}; use pin_project_lite::pin_project; @@ -161,7 +161,7 @@ impl Stream for ReaderStream { let this = self.project(); Poll::Ready( - match futures::ready!(this.reader.poll_read(cx, this.buf)?) { + match futures_util::ready!(this.reader.poll_read(cx, this.buf)?) { 0 => None, size => Some(Ok(Bytes::copy_from_slice(&this.buf[..size]))), }, diff --git a/src/http/websocket.rs b/src/http/websocket.rs index 360c2870..025f60cb 100644 --- a/src/http/websocket.rs +++ b/src/http/websocket.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use futures::Stream; +use futures_util::stream::Stream; use pin_project_lite::pin_project; use serde::{Deserialize, Serialize}; diff --git a/src/lib.rs b/src/lib.rs index 910a2ded..4b2787f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,7 +147,7 @@ pub use async_trait; #[doc(hidden)] pub use context::ContextSelectionSet; #[doc(hidden)] -pub use futures; +pub use futures_util; #[doc(hidden)] pub use indexmap; #[doc(hidden)] @@ -712,7 +712,7 @@ pub use async_graphql_derive::Union; /// /// ```rust /// use async_graphql::*; -/// use futures::{Stream, StreamExt}; +/// use futures_util::stream::{Stream, StreamExt}; /// /// struct SubscriptionRoot; /// @@ -720,7 +720,7 @@ pub use async_graphql_derive::Union; /// impl SubscriptionRoot { /// async fn value(&self, condition: i32) -> impl Stream { /// // Returns the number from 0 to `condition`. -/// futures::stream::iter(0..condition) +/// futures_util::stream::iter(0..condition) /// } /// } /// ``` @@ -789,7 +789,7 @@ pub use async_graphql_derive::MergedObject; /// /// ```rust /// use async_graphql::*; -/// use futures::Stream; +/// use futures_util::stream::Stream; /// /// #[derive(Default)] /// struct Subscription1; @@ -797,7 +797,7 @@ pub use async_graphql_derive::MergedObject; /// #[Subscription] /// impl Subscription1 { /// async fn events1(&self) -> impl Stream { -/// futures::stream::iter(0..10) +/// futures_util::stream::iter(0..10) /// } /// } /// @@ -807,7 +807,7 @@ pub use async_graphql_derive::MergedObject; /// #[Subscription] /// impl Subscription2 { /// async fn events2(&self) -> impl Stream { -/// futures::stream::iter(10..20) +/// futures_util::stream::iter(10..20) /// } /// } /// diff --git a/src/resolver_utils/container.rs b/src/resolver_utils/container.rs index d74742dd..8ade8cfb 100644 --- a/src/resolver_utils/container.rs +++ b/src/resolver_utils/container.rs @@ -82,7 +82,7 @@ async fn resolve_container_inner<'a, T: ContainerType + Send + Sync>( fields.add_set(ctx, root)?; let res = if parallel { - futures::future::try_join_all(fields.0).await? + futures_util::future::try_join_all(fields.0).await? } else { let mut results = Vec::with_capacity(fields.0.len()); for field in fields.0 { diff --git a/src/resolver_utils/list.rs b/src/resolver_utils/list.rs index f3d9b6a5..bf31508f 100644 --- a/src/resolver_utils/list.rs +++ b/src/resolver_utils/list.rs @@ -54,5 +54,5 @@ pub async fn resolve_list<'a, T: OutputValueType + Send + Sync + 'a>( }); } - Ok(Value::List(futures::future::try_join_all(futures).await?)) + Ok(Value::List(futures_util::future::try_join_all(futures).await?)) } diff --git a/src/schema.rs b/src/schema.rs index 0a294747..5c454141 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -4,7 +4,7 @@ use std::ops::Deref; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use futures::stream::{self, Stream, StreamExt}; +use futures_util::stream::{self, Stream, StreamExt}; use indexmap::map::IndexMap; use crate::context::{Data, QueryEnvInner, ResolveId}; @@ -487,7 +487,7 @@ where match batch_request { BatchRequest::Single(request) => BatchResponse::Single(self.execute(request).await), BatchRequest::Batch(requests) => BatchResponse::Batch( - futures::stream::iter(requests.into_iter()) + futures_util::stream::iter(requests.into_iter()) .then(|request| self.execute(request)) .collect() .await, diff --git a/src/subscription.rs b/src/subscription.rs index 227ee1ba..8ad1c104 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use futures::{Stream, StreamExt}; +use futures_util::stream::{Stream, StreamExt}; use crate::parser::types::{Selection, TypeCondition}; use crate::{ diff --git a/src/types/connection/connection_type.rs b/src/types/connection/connection_type.rs index f961d52f..e837b50b 100644 --- a/src/types/connection/connection_type.rs +++ b/src/types/connection/connection_type.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures_util::stream::{Stream, StreamExt, TryStreamExt}; use indexmap::map::IndexMap; use crate::connection::edge::Edge; diff --git a/src/types/empty_subscription.rs b/src/types/empty_subscription.rs index 3d05f9f7..f0082f46 100644 --- a/src/types/empty_subscription.rs +++ b/src/types/empty_subscription.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::pin::Pin; -use futures::{stream, Stream}; +use futures_util::stream::{self, Stream}; use crate::{registry, Context, ServerError, ServerResult, SubscriptionType, Type, Value}; diff --git a/src/types/upload.rs b/src/types/upload.rs index 401ca458..1bf557d6 100644 --- a/src/types/upload.rs +++ b/src/types/upload.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::fs::File; use std::io::Read; -use futures::AsyncRead; +use futures_util::io::AsyncRead; use crate::{registry, Context, InputValueError, InputValueResult, InputValueType, Type, Value}; diff --git a/src/validation/suggestion.rs b/src/validation/suggestion.rs index 5da60ddd..a935d898 100644 --- a/src/validation/suggestion.rs +++ b/src/validation/suggestion.rs @@ -46,7 +46,7 @@ where for (i, s) in selected.iter().enumerate() { if i != 0 { - suggestion.push_str(" , "); + suggestion.push_str(", "); } write!(suggestion, "\"{}\"", s).unwrap(); } diff --git a/tests/field_features.rs b/tests/field_features.rs index 4a1197ca..e833949b 100644 --- a/tests/field_features.rs +++ b/tests/field_features.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] use async_graphql::*; -use futures::{Stream, StreamExt}; +use futures_util::stream::{Stream, StreamExt}; #[async_std::test] pub async fn test_field_features() { @@ -19,19 +19,19 @@ pub async fn test_field_features() { #[Subscription] impl SubscriptionRoot { async fn values(&self) -> impl Stream { - futures::stream::once(async move { 10 }) + futures_util::stream::once(async move { 10 }) } #[cfg(feature = "bson")] async fn values_bson(&self) -> impl Stream { - futures::stream::once(async move { 10 }) + futures_util::stream::once(async move { 10 }) } #[cfg(feature = "abc")] async fn values_abc( &self, - ) -> Pin + Send + 'static>> { - Box::pin(futures::stream::once(async move { 10 })) + ) -> Pin + Send + 'static>> { + Box::pin(futures_util::stream::once(async move { 10 })) } } diff --git a/tests/guard.rs b/tests/guard.rs index 0d53f4e3..389303e0 100644 --- a/tests/guard.rs +++ b/tests/guard.rs @@ -1,6 +1,6 @@ use async_graphql::guard::Guard; use async_graphql::*; -use futures::{Stream, StreamExt}; +use futures_util::stream::{Stream, StreamExt}; #[derive(Eq, PartialEq, Copy, Clone)] enum Role { @@ -71,7 +71,7 @@ pub async fn test_guard_simple_rule() { impl Subscription { #[graphql(guard(RoleGuard(role = "Role::Admin")))] async fn values(&self) -> impl Stream { - futures::stream::iter(vec![1, 2, 3]) + futures_util::stream::iter(vec![1, 2, 3]) } } diff --git a/tests/merged_object.rs b/tests/merged_object.rs index a38c08aa..cd6a2100 100644 --- a/tests/merged_object.rs +++ b/tests/merged_object.rs @@ -1,5 +1,5 @@ use async_graphql::*; -use futures::{Stream, StreamExt}; +use futures_util::stream::{Stream, StreamExt}; #[derive(SimpleObject)] struct Object1 { @@ -165,7 +165,7 @@ pub async fn test_merged_subscription() { #[Subscription] impl Subscription1 { async fn events1(&self) -> impl Stream { - futures::stream::iter(0..10) + futures_util::stream::iter(0..10) } } @@ -175,7 +175,7 @@ pub async fn test_merged_subscription() { #[Subscription] impl Subscription2 { async fn events2(&self) -> impl Stream { - futures::stream::iter(10..20) + futures_util::stream::iter(10..20) } } diff --git a/tests/mutation.rs b/tests/mutation.rs index a751b305..1b7285eb 100644 --- a/tests/mutation.rs +++ b/tests/mutation.rs @@ -1,5 +1,5 @@ use async_graphql::*; -use futures::lock::Mutex; +use async_std::sync::Mutex; use std::sync::Arc; use std::time::Duration; diff --git a/tests/raw_ident.rs b/tests/raw_ident.rs index 3f741bf7..359b7dfe 100644 --- a/tests/raw_ident.rs +++ b/tests/raw_ident.rs @@ -1,5 +1,5 @@ use async_graphql::*; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures_util::stream::{Stream, StreamExt, TryStreamExt}; #[async_std::test] pub async fn test_input_value_custom_error() { @@ -41,7 +41,7 @@ pub async fn test_input_value_custom_error() { #[Subscription] impl SubscriptionRoot { async fn r#type(&self) -> impl Stream { - futures::stream::iter(0..10) + futures_util::stream::iter(0..10) } } diff --git a/tests/rename.rs b/tests/rename.rs index c39f2c13..94b11744 100644 --- a/tests/rename.rs +++ b/tests/rename.rs @@ -1,5 +1,5 @@ use async_graphql::*; -use futures::{Stream, StreamExt}; +use futures_util::stream::{Stream, StreamExt}; #[async_std::test] pub async fn test_enum() { @@ -114,7 +114,7 @@ pub async fn test_subscription() { #[allow(non_snake_case)] impl Subscription { async fn create_object(&self, ObjectId: i32) -> impl Stream { - futures::stream::once(async move { ObjectId }) + futures_util::stream::once(async move { ObjectId }) } } diff --git a/tests/subscription.rs b/tests/subscription.rs index 5190b6a6..13410837 100644 --- a/tests/subscription.rs +++ b/tests/subscription.rs @@ -1,5 +1,5 @@ use async_graphql::*; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures_util::stream::{Stream, StreamExt, TryStreamExt}; #[async_std::test] pub async fn test_subscription() { @@ -19,11 +19,11 @@ pub async fn test_subscription() { #[Subscription] impl SubscriptionRoot { async fn values(&self, start: i32, end: i32) -> impl Stream { - futures::stream::iter(start..end) + futures_util::stream::iter(start..end) } async fn events(&self, start: i32, end: i32) -> impl Stream { - futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) + futures_util::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) } } @@ -77,11 +77,11 @@ pub async fn test_subscription_with_ctx_data() { impl SubscriptionRoot { async fn values(&self, ctx: &Context<'_>) -> impl Stream { let value = *ctx.data_unchecked::(); - futures::stream::once(async move { value }) + futures_util::stream::once(async move { value }) } async fn objects(&self) -> impl Stream { - futures::stream::once(async move { MyObject }) + futures_util::stream::once(async move { MyObject }) } } @@ -118,7 +118,7 @@ pub async fn test_subscription_with_token() { if ctx.data_unchecked::().0 != "123456" { return Err("forbidden".into()); } - Ok(futures::stream::once(async move { 100 })) + Ok(futures_util::stream::once(async move { 100 })) } } @@ -166,7 +166,7 @@ pub async fn test_subscription_inline_fragment() { #[Subscription] impl SubscriptionRoot { async fn events(&self, start: i32, end: i32) -> impl Stream { - futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) + futures_util::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) } } @@ -219,7 +219,7 @@ pub async fn test_subscription_fragment() { #[Subscription] impl SubscriptionRoot { async fn events(&self, start: i32, end: i32) -> impl Stream { - futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) + futures_util::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) } } @@ -274,7 +274,7 @@ pub async fn test_subscription_fragment2() { #[Subscription] impl SubscriptionRoot { async fn events(&self, start: i32, end: i32) -> impl Stream { - futures::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) + futures_util::stream::iter((start..end).map(|n| Event { a: n, b: n * 10 })) } } @@ -333,7 +333,7 @@ pub async fn test_subscription_error() { #[Subscription] impl SubscriptionRoot { async fn events(&self) -> impl Stream { - futures::stream::iter((0..10).map(|n| Event { value: n })) + futures_util::stream::iter((0..10).map(|n| Event { value: n })) } } @@ -380,9 +380,9 @@ pub async fn test_subscription_fieldresult() { #[Subscription] impl SubscriptionRoot { async fn values(&self) -> impl Stream> { - futures::stream::iter(0..5) + futures_util::stream::iter(0..5) .map(Result::Ok) - .chain(futures::stream::once( + .chain(futures_util::stream::once( async move { Err("StreamErr".into()) }, )) } diff --git a/tests/subscription_websocket.rs b/tests/subscription_websocket.rs index 53b6599c..2da5351e 100644 --- a/tests/subscription_websocket.rs +++ b/tests/subscription_websocket.rs @@ -1,6 +1,5 @@ use async_graphql::*; -use futures::channel::mpsc; -use futures::{SinkExt, Stream, StreamExt}; +use futures_util::stream::{Stream, StreamExt}; #[async_std::test] pub async fn test_subscription_ws_transport() { @@ -14,12 +13,12 @@ pub async fn test_subscription_ws_transport() { #[Subscription] impl SubscriptionRoot { async fn values(&self) -> impl Stream { - futures::stream::iter(0..10) + futures_util::stream::iter(0..10) } } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); - let (mut tx, rx) = mpsc::unbounded(); + let (tx, rx) = async_channel::unbounded(); let mut stream = http::WebSocket::new(schema, rx); tx.send( @@ -88,12 +87,12 @@ pub async fn test_subscription_ws_transport_with_token() { if ctx.data_unchecked::().0 != "123456" { return Err("forbidden".into()); } - Ok(futures::stream::iter(0..10)) + Ok(futures_util::stream::iter(0..10)) } } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); - let (mut tx, rx) = mpsc::unbounded(); + let (tx, rx) = async_channel::unbounded(); let mut stream = http::WebSocket::with_data( schema, rx, @@ -187,12 +186,12 @@ pub async fn test_subscription_ws_transport_error() { #[Subscription] impl SubscriptionRoot { async fn events(&self) -> impl Stream { - futures::stream::iter((0..10).map(|n| Event { value: n })) + futures_util::stream::iter((0..10).map(|n| Event { value: n })) } } let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); - let (mut tx, rx) = mpsc::unbounded(); + let (tx, rx) = async_channel::unbounded(); let mut stream = http::WebSocket::new(schema, rx); tx.send( @@ -264,7 +263,7 @@ pub async fn test_query_over_websocket() { } let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); - let (mut tx, rx) = mpsc::unbounded(); + let (tx, rx) = async_channel::unbounded(); let mut stream = http::WebSocket::new(schema, rx); tx.send(