Support GET requests in actix-web integration

This commit is contained in:
Koxiaet 2020-10-15 10:33:38 +01:00
parent cac3d4c074
commit fff84a3170
5 changed files with 92 additions and 69 deletions

View File

@ -14,10 +14,16 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
async-graphql = { path = "../..", version = "=2.0.3" }
actix = "0.10.0"
actix-http = "2.0.0"
actix-web = { version = "3.0.2", default-features = false }
actix-web-actors = "3.0.0"
actix-http = "2.0.0"
actix = "0.10.0"
actix-rt = "1.1.0"
futures = "0.3.5"
serde_json = "1.0.48"
async-channel = "1.5.1"
futures-util = { version = "0.3.6", default-features = false }
serde_json = "1.0.59"
serde_urlencoded = "0.7.0"
[dev-dependencies]
actix-rt = "1.1.1"
async-mutex = "1.4.0"

View File

@ -5,17 +5,19 @@ mod subscription;
pub use subscription::WSSubscription;
use std::future::Future;
use std::io::{self, ErrorKind};
use std::pin::Pin;
use actix_web::client::PayloadError;
use actix_web::dev::{Payload, PayloadStream};
use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder, Result};
use actix_web::http::{Method, StatusCode};
use actix_web::{http, Error, FromRequest, HttpRequest, HttpResponse, Responder, Result};
use futures_util::future::{self, FutureExt, Ready};
use futures_util::{StreamExt, TryStreamExt};
use async_graphql::http::MultipartOptions;
use async_graphql::ParseRequestError;
use futures::channel::mpsc;
use futures::future::{self, FutureExt, Ready};
use futures::io::ErrorKind;
use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use std::io;
use std::pin::Pin;
/// Extractor for GraphQL request.
///
@ -30,23 +32,21 @@ impl Request {
}
}
type RequestMapper =
type BatchToRequestMapper =
fn(<<BatchRequest as FromRequest>::Future as Future>::Output) -> Result<Request>;
impl FromRequest for Request {
type Error = Error;
type Future = future::Map<<BatchRequest as FromRequest>::Future, RequestMapper>;
type Future = future::Map<<BatchRequest as FromRequest>::Future, BatchToRequestMapper>;
type Config = MultipartOptions;
fn from_request(req: &HttpRequest, payload: &mut Payload<PayloadStream>) -> Self::Future {
BatchRequest::from_request(req, payload).map(|res| {
res.and_then(|batch| {
batch
.0
Ok(Self(
res?.0
.into_single()
.map_err(actix_web::error::ErrorBadRequest)
})
.map(Self)
.map_err(actix_web::error::ErrorBadRequest)?,
))
})
}
}
@ -72,41 +72,64 @@ impl FromRequest for BatchRequest {
fn from_request(req: &HttpRequest, payload: &mut Payload<PayloadStream>) -> Self::Future {
let config = req.app_data::<Self::Config>().cloned().unwrap_or_default();
let content_type = req
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string());
if req.method() == Method::GET {
let res = serde_urlencoded::from_str(req.query_string());
Box::pin(async move { Ok(Self(res?)) })
} else {
let content_type = req
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string());
let (mut tx, rx) = mpsc::channel(16);
let (tx, rx) = async_channel::bounded(16);
// Because Payload is !Send, so forward it to mpsc::Sender
let mut payload = web::Payload(payload.take());
actix_rt::spawn(async move {
while let Some(item) = payload.next().await {
if tx.send(item).await.is_err() {
return;
}
}
});
Box::pin(async move {
Ok(BatchRequest(
async_graphql::http::receive_batch_body(
content_type,
rx.map_err(|err| io::Error::new(ErrorKind::Other, err))
.into_async_read(),
config,
)
.map_err(|err| match err {
ParseRequestError::PayloadTooLarge => {
actix_web::error::ErrorPayloadTooLarge(err)
// Payload is !Send so we create indirection with a channel
let mut payload = payload.take();
actix::spawn(async move {
while let Some(item) = payload.next().await {
if tx.send(item).await.is_err() {
return;
}
_ => actix_web::error::ErrorBadRequest(err),
})
.await?,
))
})
}
});
Box::pin(async move {
Ok(BatchRequest(
async_graphql::http::receive_batch_body(
content_type,
rx.map_err(|e| match e {
PayloadError::Incomplete(Some(e)) | PayloadError::Io(e) => e,
PayloadError::Incomplete(None) => {
io::Error::from(ErrorKind::UnexpectedEof)
}
PayloadError::EncodingCorrupted => io::Error::new(
ErrorKind::InvalidData,
"cannot decode content-encoding",
),
PayloadError::Overflow => io::Error::new(
ErrorKind::InvalidData,
"a payload reached size limit",
),
PayloadError::UnknownLength => {
io::Error::new(ErrorKind::Other, "a payload length is unknown")
}
PayloadError::Http2Payload(e) if e.is_io() => e.into_io().unwrap(),
PayloadError::Http2Payload(e) => io::Error::new(ErrorKind::Other, e),
})
.into_async_read(),
config,
)
.await
.map_err(|err| match err {
ParseRequestError::PayloadTooLarge => {
actix_web::error::ErrorPayloadTooLarge(err)
}
_ => actix_web::error::ErrorBadRequest(err),
})?,
))
})
}
}
}
@ -140,6 +163,6 @@ impl Responder for Response {
res.header("cache-control", cache_control);
}
}
futures::future::ok(res.body(serde_json::to_string(&self.0).unwrap()))
futures_util::future::ok(res.body(serde_json::to_string(&self.0).unwrap()))
}
}

View File

@ -6,8 +6,6 @@ use actix_http::ws;
use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext};
use async_graphql::http::WebSocket;
use async_graphql::{Data, ObjectType, Result, Schema, SubscriptionType};
use futures::channel::mpsc;
use futures::SinkExt;
use std::time::{Duration, Instant};
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
@ -17,7 +15,7 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
pub struct WSSubscription<Query, Mutation, Subscription> {
schema: Option<Schema<Query, Mutation, Subscription>>,
last_heartbeat: Instant,
messages: Option<mpsc::UnboundedSender<Vec<u8>>>,
messages: Option<async_channel::Sender<Vec<u8>>>,
initializer: Option<Box<dyn FnOnce(serde_json::Value) -> Result<Data> + Send + Sync>>,
continuation: Vec<u8>,
}
@ -71,7 +69,7 @@ where
fn started(&mut self, ctx: &mut Self::Context) {
self.send_heartbeats(ctx);
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = async_channel::unbounded();
WebSocket::with_data(self.schema.take().unwrap(), rx, self.initializer.take())
.into_actor(self)
@ -135,7 +133,7 @@ where
};
if let Some(message) = message {
let mut sender = self.messages.as_ref().unwrap().clone();
let sender = self.messages.as_ref().unwrap().clone();
async move { sender.send(message).await }
.into_actor(self)

View File

@ -4,7 +4,7 @@ use async_graphql::{
Context, EmptyMutation, EmptySubscription, Object, ObjectType, Schema, SubscriptionType,
};
use async_graphql_actix_web::{Request, Response};
use futures::lock::Mutex;
use async_mutex::Mutex;
pub async fn gql_playgound() -> HttpResponse {
HttpResponse::Ok()
@ -42,7 +42,7 @@ pub(crate) struct CountQueryRoot;
#[Object]
impl CountQueryRoot {
async fn count<'a>(&self, ctx: &'a Context<'_>) -> i32 {
ctx.data_unchecked::<Count>().lock().await.clone()
*ctx.data_unchecked::<Count>().lock().await
}
}
@ -53,13 +53,13 @@ impl CountMutation {
async fn add_count<'a>(&self, ctx: &'a Context<'_>, count: i32) -> i32 {
let mut guard_count = ctx.data_unchecked::<Count>().lock().await;
*guard_count += count;
guard_count.clone()
*guard_count
}
async fn subtract_count<'a>(&self, ctx: &'a Context<'_>, count: i32) -> i32 {
let mut guard_count = ctx.data_unchecked::<Count>().lock().await;
*guard_count -= count;
guard_count.clone()
*guard_count
}
}

View File

@ -46,16 +46,12 @@ pub async fn receive_batch_body(
}
/// Receive a GraphQL request from a body as JSON.
pub async fn receive_json(
body: impl AsyncRead + Send + 'static,
) -> Result<Request, ParseRequestError> {
pub async fn receive_json(body: impl AsyncRead) -> Result<Request, ParseRequestError> {
receive_batch_json(body).await?.into_single()
}
/// Receive a GraphQL batch request from a body as JSON.
pub async fn receive_batch_json(
body: impl AsyncRead + Send + 'static,
) -> Result<BatchRequest, ParseRequestError> {
pub async fn receive_batch_json(body: impl AsyncRead) -> Result<BatchRequest, ParseRequestError> {
let mut data = Vec::new();
futures::pin_mut!(body);
body.read_to_end(&mut data)