No longer assumes that a subscription stream that failed to resolve has ended. #744

This commit is contained in:
Sunli 2021-12-07 11:00:53 +08:00
parent c2083625f3
commit eb27b0856e
3 changed files with 25 additions and 16 deletions

View File

@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [3.0.13] 2021-12-06
- No longer assumes that a subscription stream that failed to resolve has ended. [#744](https://github.com/async-graphql/async-graphql/issues/744)
## [3.0.12] 2021-12-05
- Fix possible deadlock in dataloader. [#555](https://github.com/async-graphql/async-graphql/issues/555)

View File

@ -391,21 +391,7 @@ pub fn generate(
}
}
});
#crate_name::ServerResult::Ok(#crate_name::futures_util::stream::StreamExt::scan(
stream,
false,
|errored, item| {
if *errored {
return #crate_name::futures_util::future::ready(::std::option::Option::None);
}
match &item {
::std::result::Result::Err(_) => *errored = true,
::std::result::Result::Ok(resp) if resp.is_err() => *errored = true,
_ => {}
}
#crate_name::futures_util::future::ready(::std::option::Option::Some(item))
},
))
#crate_name::ServerResult::Ok(stream)
};
create_stream.push(quote! {

View File

@ -313,7 +313,7 @@ pub async fn test_subscription_error() {
#[Object]
impl Event {
async fn value(&self) -> Result<i32> {
if self.value < 5 {
if self.value != 5 {
Ok(self.value)
} else {
Err("TestError".into())
@ -335,12 +335,14 @@ pub async fn test_subscription_error() {
.execute_stream("subscription { events { value } }")
.map(|resp| resp.into_result())
.map_ok(|resp| resp.data);
for i in 0i32..5 {
assert_eq!(
value!({ "events": { "value": i } }),
stream.next().await.unwrap().unwrap()
);
}
assert_eq!(
stream.next().await,
Some(Err(vec![ServerError {
@ -358,6 +360,13 @@ pub async fn test_subscription_error() {
}]))
);
for i in 6i32..10 {
assert_eq!(
value!({ "events": { "value": i } }),
stream.next().await.unwrap().unwrap()
);
}
assert!(stream.next().await.is_none());
}
@ -373,17 +382,20 @@ pub async fn test_subscription_fieldresult() {
.chain(futures_util::stream::once(async move {
Err("StreamErr".into())
}))
.chain(futures_util::stream::iter(5..10).map(Result::Ok))
}
}
let schema = Schema::new(Query, EmptyMutation, Subscription);
let mut stream = schema.execute_stream("subscription { values }");
for i in 0i32..5 {
assert_eq!(
Response::new(value!({ "values": i })),
stream.next().await.unwrap()
);
}
assert_eq!(
Response {
data: Value::Null,
@ -404,5 +416,12 @@ pub async fn test_subscription_fieldresult() {
stream.next().await.unwrap(),
);
for i in 5i32..10 {
assert_eq!(
Response::new(value!({ "values": i })),
stream.next().await.unwrap()
);
}
assert!(stream.next().await.is_none());
}