Remove Sync for StreamBody

This commit is contained in:
Sunli 2020-05-21 16:12:18 +08:00
parent 906bcce932
commit da3cf3ae2f
3 changed files with 18 additions and 11 deletions

View File

@ -18,8 +18,8 @@ tide = "0.8.1"
async-trait = "0.1.30"
serde_json = "1.0.51"
futures = "0.3.4"
async-std = "1.5.0"
[dev-dependencies]
async-std = "1.5.0"
smol = { version = "0.1", features = ["tokio02"] }
reqwest = "0.10.4"

View File

@ -10,8 +10,9 @@ use async_graphql::{
QueryResponse, Schema, StreamResponse, SubscriptionType,
};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::io::BufReader;
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use tide::{
http::{headers, Method},
Body, Request, Response, Status, StatusCode,
@ -149,10 +150,22 @@ impl ResponseExt for Response {
match res {
StreamResponse::Single(res) => self.body_graphql(res),
StreamResponse::Stream(stream) => {
let r = BufReader::new(StreamBody::new(Box::pin(
// Body::from_reader required Sync, however StreamResponse does not have Sync.
// I created an issue and got a reply that this might be fixed in the future.
// https://github.com/http-rs/http-types/pull/144
// Now I can only use forwarding to solve the problem.
let mut stream = StreamBody::new(Box::pin(
multipart_stream(stream).map(Result::Ok::<_, std::io::Error>),
)));
self.set_body(Body::from_reader(r, None));
));
let (mut tx, rx) = mpsc::channel(0);
async_std::task::spawn(async move {
while let Some(item) = stream.next().await {
if tx.send(item).await.is_err() {
return;
}
}
});
self.set_body(Body::from_reader(BufReader::new(rx), None));
Ok(self.set_header(tide::http::headers::CONTENT_TYPE, "multipart/mixed"))
}
}

View File

@ -20,12 +20,6 @@ impl<S> StreamBody<S> {
}
}
// TODO: I think that the response stream of the http server does not need `Sync`, because it is impossible
// to have multiple threads reading at the same time. I created a PR to tide, if he agrees to merge, then
// I can remove this unsafe.
// https://github.com/http-rs/http-types/pull/144
unsafe impl<S> Sync for StreamBody<S> {}
impl<S, E, D> AsyncRead for StreamBody<S>
where
D: Buf,