Remove async_graphql::http::StreamBody
l:
This commit is contained in:
parent
3b5d387f55
commit
bbf784fe42
|
@ -8,12 +8,14 @@ mod subscription;
|
||||||
use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream};
|
use actix_web::dev::{HttpResponseBuilder, Payload, PayloadStream};
|
||||||
use actix_web::http::StatusCode;
|
use actix_web::http::StatusCode;
|
||||||
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
|
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
|
||||||
use async_graphql::http::{receive_body, MultipartOptions, StreamBody};
|
use async_graphql::http::{receive_body, MultipartOptions};
|
||||||
use async_graphql::{ParseRequestError, Request, Response};
|
use async_graphql::{ParseRequestError, Request, Response};
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::future::Ready;
|
use futures::future::Ready;
|
||||||
use futures::{Future, SinkExt, StreamExt, TryFutureExt};
|
use futures::io::ErrorKind;
|
||||||
|
use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
|
||||||
use http::Method;
|
use http::Method;
|
||||||
|
use std::io;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
pub use subscription::WSSubscription;
|
pub use subscription::WSSubscription;
|
||||||
|
|
||||||
|
@ -65,14 +67,19 @@ impl FromRequest for GQLRequest {
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
Ok(GQLRequest(
|
Ok(GQLRequest(
|
||||||
receive_body(content_type, StreamBody::new(rx), config)
|
receive_body(
|
||||||
.map_err(|err| match err {
|
content_type,
|
||||||
ParseRequestError::PayloadTooLarge => {
|
rx.map_err(|err| io::Error::new(ErrorKind::Other, err))
|
||||||
actix_web::error::ErrorPayloadTooLarge(err)
|
.into_async_read(),
|
||||||
}
|
config,
|
||||||
_ => actix_web::error::ErrorBadRequest(err),
|
)
|
||||||
})
|
.map_err(|err| match err {
|
||||||
.await?,
|
ParseRequestError::PayloadTooLarge => {
|
||||||
|
actix_web::error::ErrorPayloadTooLarge(err)
|
||||||
|
}
|
||||||
|
_ => actix_web::error::ErrorBadRequest(err),
|
||||||
|
})
|
||||||
|
.await?,
|
||||||
))
|
))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,17 +5,19 @@
|
||||||
#![allow(clippy::needless_doctest_main)]
|
#![allow(clippy::needless_doctest_main)]
|
||||||
#![forbid(unsafe_code)]
|
#![forbid(unsafe_code)]
|
||||||
|
|
||||||
use async_graphql::http::{GQLRequest, MultipartOptions, StreamBody};
|
use async_graphql::http::{GQLRequest, MultipartOptions};
|
||||||
use async_graphql::{resolver_utils::ObjectType, Data, FieldResult, Schema, SubscriptionType};
|
use async_graphql::{resolver_utils::ObjectType, Data, FieldResult, Schema, SubscriptionType};
|
||||||
use futures::select;
|
use futures::io::ErrorKind;
|
||||||
|
use futures::{select, TryStreamExt};
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
use hyper::Method;
|
use hyper::Method;
|
||||||
|
use std::io;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use warp::filters::ws::Message;
|
use warp::filters::ws::Message;
|
||||||
use warp::filters::BoxedFilter;
|
use warp::filters::BoxedFilter;
|
||||||
use warp::reject::Reject;
|
use warp::reject::Reject;
|
||||||
use warp::reply::Response;
|
use warp::reply::Response;
|
||||||
use warp::{Filter, Rejection, Reply};
|
use warp::{Buf, Filter, Rejection, Reply};
|
||||||
|
|
||||||
/// Bad request error
|
/// Bad request error
|
||||||
///
|
///
|
||||||
|
@ -116,9 +118,13 @@ where
|
||||||
} else {
|
} else {
|
||||||
let request = async_graphql::http::receive_body(
|
let request = async_graphql::http::receive_body(
|
||||||
content_type,
|
content_type,
|
||||||
StreamBody::new(body),
|
futures::TryStreamExt::map_err(body, |err| io::Error::new(ErrorKind::Other, err))
|
||||||
MultipartOptions::clone(&opts)
|
.map_ok(|mut buf| Buf::to_bytes(&mut buf))
|
||||||
).await.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
|
.into_async_read(),
|
||||||
|
MultipartOptions::clone(&opts),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
|
||||||
Ok::<_, Rejection>((schema, request))
|
Ok::<_, Rejection>((schema, request))
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -3,12 +3,10 @@
|
||||||
mod graphiql_source;
|
mod graphiql_source;
|
||||||
mod multipart;
|
mod multipart;
|
||||||
mod playground_source;
|
mod playground_source;
|
||||||
mod stream_body;
|
|
||||||
|
|
||||||
pub use graphiql_source::graphiql_source;
|
pub use graphiql_source::graphiql_source;
|
||||||
pub use multipart::{receive_multipart, MultipartOptions};
|
pub use multipart::{receive_multipart, MultipartOptions};
|
||||||
pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
|
pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
|
||||||
pub use stream_body::StreamBody;
|
|
||||||
|
|
||||||
use crate::{Data, ParseRequestError, Request, Variables};
|
use crate::{Data, ParseRequestError, Request, Variables};
|
||||||
use futures::io::AsyncRead;
|
use futures::io::AsyncRead;
|
||||||
|
|
|
@ -1,55 +0,0 @@
|
||||||
use bytes::{Buf, Bytes};
|
|
||||||
use futures::task::{Context, Poll};
|
|
||||||
use futures::{AsyncRead, Stream, StreamExt};
|
|
||||||
use std::io::{Error, ErrorKind, Result};
|
|
||||||
use std::pin::Pin;
|
|
||||||
|
|
||||||
/// An Adapter for bytes stream to `AsyncRead`
|
|
||||||
pub struct StreamBody<S> {
|
|
||||||
s: S,
|
|
||||||
remaining_bytes: Option<Bytes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> StreamBody<S> {
|
|
||||||
#[allow(missing_docs)]
|
|
||||||
pub fn new(s: S) -> Self {
|
|
||||||
Self {
|
|
||||||
s,
|
|
||||||
remaining_bytes: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, E, D> AsyncRead for StreamBody<S>
|
|
||||||
where
|
|
||||||
D: Buf,
|
|
||||||
S: Stream<Item = std::result::Result<D, E>> + Unpin,
|
|
||||||
{
|
|
||||||
fn poll_read(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut [u8],
|
|
||||||
) -> Poll<Result<usize>> {
|
|
||||||
loop {
|
|
||||||
if let Some(bytes) = &mut self.remaining_bytes {
|
|
||||||
let data = bytes.split_to(buf.len().min(bytes.len()));
|
|
||||||
buf[..data.len()].copy_from_slice(&data);
|
|
||||||
if !bytes.has_remaining() {
|
|
||||||
self.remaining_bytes = None;
|
|
||||||
}
|
|
||||||
return Poll::Ready(Ok(data.len()));
|
|
||||||
} else {
|
|
||||||
match self.s.poll_next_unpin(cx) {
|
|
||||||
Poll::Ready(Some(Ok(mut bytes))) => {
|
|
||||||
self.remaining_bytes = Some(bytes.to_bytes());
|
|
||||||
}
|
|
||||||
Poll::Ready(Some(Err(_))) => {
|
|
||||||
return Poll::Ready(Err(Error::from(ErrorKind::InvalidData)))
|
|
||||||
}
|
|
||||||
Poll::Ready(None) => return Poll::Ready(Ok(0)),
|
|
||||||
Poll::Pending => return Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user