Support !Unpin in receive_body with pin project

This commit is contained in:
Koxiaet 2020-09-13 11:49:07 +01:00
parent 863d57a4b0
commit 91ec3486ce
3 changed files with 37 additions and 17 deletions

View File

@ -42,6 +42,7 @@ spin = "0.5.2"
tempfile = "3.1.0"
thiserror = "1.0.11"
tracing = "0.1.13"
pin-project-lite = "0.1.7"
uuid = { version = "0.8.1", features = ["v4", "serde"] }
bson = { version = "1.0.0", optional = true }

View File

@ -16,13 +16,14 @@ use futures::AsyncReadExt;
/// Receive a GraphQL request from a content type and body.
pub async fn receive_body(
content_type: Option<impl AsRef<str>>,
mut body: impl AsyncRead + Unpin + Send + 'static,
body: impl AsyncRead + Send + 'static,
opts: MultipartOptions,
) -> Result<Request, ParseRequestError> {
if let Some(Ok(boundary)) = content_type.map(multer::parse_boundary) {
receive_multipart(body, boundary, opts).await
} else {
let mut data = Vec::new();
futures::pin_mut!(body);
body.read_to_end(&mut data)
.await
.map_err(ParseRequestError::Io)?;

View File

@ -1,12 +1,13 @@
use crate::{ParseRequestError, Request};
use bytes::Bytes;
use futures::io::AsyncRead;
use futures::stream::{self, Stream};
use futures::stream::Stream;
use multer::{Constraints, Multipart, SizeLimit};
use std::collections::HashMap;
use std::io::{self, Seek, SeekFrom, Write};
use std::pin::Pin;
use std::task::Poll;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
/// Options for `receive_multipart`.
#[derive(Default, Clone)]
@ -40,7 +41,7 @@ pub async fn receive_multipart(
opts: MultipartOptions,
) -> Result<Request, ParseRequestError> {
let mut multipart = Multipart::new_with_constraints(
reader_stream(body),
ReaderStream::new(body),
boundary,
Constraints::new().size_limit({
let mut limit = SizeLimit::new();
@ -115,17 +116,34 @@ pub async fn receive_multipart(
Ok(request)
}
fn reader_stream(
reader: impl AsyncRead + Send + 'static,
) -> impl Stream<Item = io::Result<Bytes>> + Send + 'static {
let mut buf = [0u8; 2048];
let mut reader = Box::pin(reader);
stream::poll_fn(move |cx| {
Poll::Ready(
match futures::ready!(Pin::new(&mut reader).poll_read(cx, &mut buf)?) {
0 => None,
size => Some(Ok(Bytes::copy_from_slice(&buf[..size]))),
},
)
})
pin_project! {
struct ReaderStream<T> {
buf: [u8; 2048],
#[pin]
reader: T,
}
}
impl<T> ReaderStream<T> {
fn new(reader: T) -> Self {
Self {
buf: [0; 2048],
reader,
}
}
}
impl<T: AsyncRead> Stream for ReaderStream<T> {
type Item = io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
Poll::Ready(
match futures::ready!(this.reader.poll_read(cx, &mut self.buf)?) {
0 => None,
size => Some(Ok(Bytes::copy_from_slice(&self.buf[..size]))),
}
)
}
}