diff --git a/integrations/actix-web/src/batch_request.rs b/integrations/actix-web/src/batch_request.rs deleted file mode 100644 index 9c47a292..00000000 --- a/integrations/actix-web/src/batch_request.rs +++ /dev/null @@ -1,95 +0,0 @@ -use actix_web::dev::{Payload, PayloadStream}; -use actix_web::http::StatusCode; -use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder}; -use async_graphql::http::MultipartOptions; -use async_graphql::ParseRequestError; -use futures::channel::mpsc; -use futures::future::Ready; -use futures::io::ErrorKind; -use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; -use std::io; -use std::pin::Pin; - -/// Extractor for GraphQL batch request. -/// -/// `async_graphql::http::MultipartOptions` allows to configure extraction process. -pub struct BatchRequest(pub async_graphql::BatchRequest); - -impl BatchRequest { - /// Unwraps the value to `async_graphql::Request`. - #[must_use] - pub fn into_inner(self) -> async_graphql::BatchRequest { - self.0 - } -} - -impl FromRequest for BatchRequest { - type Error = Error; - type Future = Pin>>>; - type Config = MultipartOptions; - - fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - let config = req.app_data::().cloned().unwrap_or_default(); - - let content_type = req - .headers() - .get(http::header::CONTENT_TYPE) - .and_then(|value| value.to_str().ok()) - .map(|value| value.to_string()); - - let (mut tx, rx) = mpsc::channel(16); - - // Because Payload is !Send, so forward it to mpsc::Sender - let mut payload = web::Payload(payload.take()); - actix_rt::spawn(async move { - while let Some(item) = payload.next().await { - if tx.send(item).await.is_err() { - return; - } - } - }); - - Box::pin(async move { - Ok(BatchRequest( - async_graphql::http::receive_batch_body( - content_type, - rx.map_err(|err| io::Error::new(ErrorKind::Other, err)) - .into_async_read(), - config, - ) - .map_err(|err| match err { - ParseRequestError::PayloadTooLarge => { - actix_web::error::ErrorPayloadTooLarge(err) - } - _ => actix_web::error::ErrorBadRequest(err), - }) - .await?, - )) - }) - } -} - -/// Responder for GraphQL batch response -pub struct BatchResponse(pub async_graphql::BatchResponse); - -impl From for BatchResponse { - fn from(resp: async_graphql::BatchResponse) -> Self { - BatchResponse(resp) - } -} - -impl Responder for BatchResponse { - type Error = Error; - type Future = Ready>; - - fn respond_to(self, _req: &HttpRequest) -> Self::Future { - let mut res = HttpResponse::build(StatusCode::OK); - res.content_type("application/json"); - if self.0.is_ok() { - if let Some(cache_control) = self.0.cache_control().value() { - res.header("cache-control", cache_control); - } - } - futures::future::ok(res.body(serde_json::to_string(&self.0).unwrap())) - } -} diff --git a/integrations/actix-web/src/lib.rs b/integrations/actix-web/src/lib.rs index 4a918274..a347f52f 100644 --- a/integrations/actix-web/src/lib.rs +++ b/integrations/actix-web/src/lib.rs @@ -1,11 +1,145 @@ //! Async-graphql integration with Actix-web - #![forbid(unsafe_code)] -mod batch_request; -mod request; mod subscription; -pub use batch_request::{BatchRequest, BatchResponse}; -pub use request::{Request, Response}; pub use subscription::WSSubscription; + +use actix_web::dev::{Payload, PayloadStream}; +use actix_web::http::StatusCode; +use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder, Result}; +use async_graphql::http::MultipartOptions; +use async_graphql::ParseRequestError; +use futures::channel::mpsc; +use futures::future::{self, FutureExt, Ready}; +use futures::io::ErrorKind; +use futures::{Future, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; +use std::io; +use std::pin::Pin; + +/// Extractor for GraphQL request. +/// +/// `async_graphql::http::MultipartOptions` allows to configure extraction process. +pub struct Request(pub async_graphql::Request); + +impl Request { + /// Unwraps the value to `async_graphql::Request`. + #[must_use] + pub fn into_inner(self) -> async_graphql::Request { + self.0 + } +} + +type RequestMapper = + fn(<::Future as Future>::Output) -> Result; + +impl FromRequest for Request { + type Error = Error; + type Future = future::Map<::Future, RequestMapper>; + type Config = MultipartOptions; + + fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { + BatchRequest::from_request(req, payload).map(|res| { + res.and_then(|batch| { + batch + .0 + .into_single() + .map_err(actix_web::error::ErrorBadRequest) + }) + .map(Self) + }) + } +} + +/// Extractor for GraphQL batch request. +/// +/// `async_graphql::http::MultipartOptions` allows to configure extraction process. +pub struct BatchRequest(pub async_graphql::BatchRequest); + +impl BatchRequest { + /// Unwraps the value to `async_graphql::BatchRequest`. + #[must_use] + pub fn into_inner(self) -> async_graphql::BatchRequest { + self.0 + } +} + +impl FromRequest for BatchRequest { + type Error = Error; + type Future = Pin>>>; + type Config = MultipartOptions; + + fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { + let config = req.app_data::().cloned().unwrap_or_default(); + + let content_type = req + .headers() + .get(http::header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(|value| value.to_string()); + + let (mut tx, rx) = mpsc::channel(16); + + // Because Payload is !Send, so forward it to mpsc::Sender + let mut payload = web::Payload(payload.take()); + actix_rt::spawn(async move { + while let Some(item) = payload.next().await { + if tx.send(item).await.is_err() { + return; + } + } + }); + + Box::pin(async move { + Ok(BatchRequest( + async_graphql::http::receive_batch_body( + content_type, + rx.map_err(|err| io::Error::new(ErrorKind::Other, err)) + .into_async_read(), + config, + ) + .map_err(|err| match err { + ParseRequestError::PayloadTooLarge => { + actix_web::error::ErrorPayloadTooLarge(err) + } + _ => actix_web::error::ErrorBadRequest(err), + }) + .await?, + )) + }) + } +} + +/// Responder for a GraphQL response. +/// +/// This contains a batch response, but since regular responses are a type of batch response it +/// works for both. +pub struct Response(pub async_graphql::BatchResponse); + +impl From for Response { + fn from(resp: async_graphql::Response) -> Self { + Self(resp.into()) + } +} + +impl From for Response { + fn from(resp: async_graphql::BatchResponse) -> Self { + Self(resp) + } +} + +impl Responder for Response { + type Error = Error; + type Future = Ready>; + + fn respond_to(self, _req: &HttpRequest) -> Self::Future { + let mut res = HttpResponse::build(StatusCode::OK); + res.content_type("application/json"); + if self.0.is_ok() { + if let Some(cache_control) = self.0.cache_control().value() { + res.header("cache-control", cache_control); + } + } + futures::future::ok(res.body(serde_json::to_string(&self.0).unwrap())) + } +} diff --git a/integrations/actix-web/src/request.rs b/integrations/actix-web/src/request.rs deleted file mode 100644 index c0e29472..00000000 --- a/integrations/actix-web/src/request.rs +++ /dev/null @@ -1,110 +0,0 @@ -use actix_web::dev::{Payload, PayloadStream}; -use actix_web::http::StatusCode; -use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder}; -use async_graphql::http::MultipartOptions; -use async_graphql::ParseRequestError; -use futures::channel::mpsc; -use futures::future::Ready; -use futures::io::ErrorKind; -use futures::{Future, SinkExt, StreamExt, TryStreamExt}; -use http::Method; -use std::io; -use std::pin::Pin; - -/// Extractor for GraphQL request. -/// -/// `async_graphql::http::MultipartOptions` allows to configure extraction process. -pub struct Request(pub async_graphql::Request); - -impl Request { - /// Unwraps the value to `async_graphql::Request`. - #[must_use] - pub fn into_inner(self) -> async_graphql::Request { - self.0 - } -} - -impl From for async_graphql::Request { - fn from(req: Request) -> Self { - req.0 - } -} - -impl FromRequest for Request { - type Error = Error; - type Future = Pin>>>; - type Config = MultipartOptions; - - fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - let config = req.app_data::().cloned().unwrap_or_default(); - - if req.method() == Method::GET { - let res = web::Query::::from_query(req.query_string()); - Box::pin(async move { - let gql_request = res?; - Ok(Request(gql_request.into_inner())) - }) - } else { - let content_type = req - .headers() - .get(http::header::CONTENT_TYPE) - .and_then(|value| value.to_str().ok()) - .map(|value| value.to_string()); - - let (mut tx, rx) = mpsc::channel(16); - - // Because Payload is !Send, so forward it to mpsc::Sender - let mut payload = web::Payload(payload.take()); - actix_rt::spawn(async move { - while let Some(item) = payload.next().await { - if tx.send(item).await.is_err() { - return; - } - } - }); - - Box::pin(async move { - Ok(Request( - async_graphql::http::receive_body( - content_type, - rx.map_err(|err| io::Error::new(ErrorKind::Other, err)) - .into_async_read(), - config, - ) - .await - .map_err(|err| match err { - ParseRequestError::PayloadTooLarge => { - actix_web::error::ErrorPayloadTooLarge(err) - } - _ => actix_web::error::ErrorBadRequest(err), - })?, - )) - }) - } - } -} - -/// Responder for GraphQL response. -pub struct Response(pub async_graphql::Response); - -impl From for Response { - fn from(resp: async_graphql::Response) -> Self { - Response(resp) - } -} - -impl Responder for Response { - type Error = Error; - type Future = Ready>; - - fn respond_to(self, _req: &HttpRequest) -> Self::Future { - let mut res = HttpResponse::build(StatusCode::OK); - res.content_type("application/json"); - if self.0.is_ok() { - if let Some(cache_control) = self.0.cache_control.value() { - res.header("cache-control", cache_control); - } - } - futures::future::ok(res.body(serde_json::to_string(&self.0).unwrap())) - } -} diff --git a/src/request.rs b/src/request.rs index 319b49a1..4556ff4b 100644 --- a/src/request.rs +++ b/src/request.rs @@ -3,6 +3,7 @@ use crate::parser::types::UploadValue; use crate::{Data, ParseRequestError, Value, Variables}; use serde::{Deserialize, Deserializer}; use std::any::Any; +use std::fmt::{self, Debug, Formatter}; use std::fs::File; /// GraphQL request. @@ -106,10 +107,20 @@ impl> From for Request { } } +impl Debug for Request { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("Request") + .field("query", &self.query) + .field("operation_name", &self.operation_name) + .field("variables", &self.variables) + .finish() + } +} + /// Batch support for GraphQL requests, which is either a single query, or an array of queries /// /// **Reference:** -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] #[serde(untagged)] pub enum BatchRequest { /// Single query @@ -121,7 +132,13 @@ pub enum BatchRequest { } impl BatchRequest { - pub(crate) fn into_single(self) -> Result { + /// Attempt to convert the batch request into a single request. + /// + /// # Errors + /// + /// Fails if the batch request is a list of requests with a message saying that batch requests + /// aren't supported. + pub fn into_single(self) -> Result { match self { Self::Single(req) => Ok(req), Self::Batch(_) => Err(ParseRequestError::UnsupportedBatch), diff --git a/src/response.rs b/src/response.rs index a252d3a4..d909fe70 100644 --- a/src/response.rs +++ b/src/response.rs @@ -121,6 +121,18 @@ impl BatchResponse { } } +impl From for BatchResponse { + fn from(response: Response) -> Self { + Self::Single(response) + } +} + +impl From> for BatchResponse { + fn from(responses: Vec) -> Self { + Self::Batch(responses) + } +} + #[cfg(test)] mod tests { use super::*;