diff --git a/integrations/actix-web/src/lib.rs b/integrations/actix-web/src/lib.rs index 021e3f59..2f0c35e3 100644 --- a/integrations/actix-web/src/lib.rs +++ b/integrations/actix-web/src/lib.rs @@ -3,175 +3,8 @@ #![allow(clippy::upper_case_acronyms)] #![warn(missing_docs)] +mod request; mod subscription; -use std::future::Future; -use std::io::{self, ErrorKind}; -use std::pin::Pin; - -use actix_http::error::PayloadError; -use actix_web::dev::{Payload, PayloadStream}; -use actix_web::http::{Method, StatusCode}; -use actix_web::{http, Error, FromRequest, HttpRequest, HttpResponse, Responder, Result}; -use futures_util::future::{self, FutureExt}; -use futures_util::{StreamExt, TryStreamExt}; - -use async_graphql::http::MultipartOptions; -use async_graphql::ParseRequestError; +pub use request::{GraphQLBatchRequest, GraphQLRequest, GraphQLResponse}; pub use subscription::GraphQLSubscription; - -/// Extractor for GraphQL request. -/// -/// `async_graphql::http::MultipartOptions` allows to configure extraction process. -pub struct GraphQLRequest(pub async_graphql::Request); - -impl GraphQLRequest { - /// Unwraps the value to `async_graphql::Request`. - #[must_use] - pub fn into_inner(self) -> async_graphql::Request { - self.0 - } -} - -type BatchToRequestMapper = - fn(<::Future as Future>::Output) -> Result; - -impl FromRequest for GraphQLRequest { - type Error = Error; - type Future = future::Map<::Future, BatchToRequestMapper>; - - fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - GraphQLBatchRequest::from_request(req, payload).map(|res| { - Ok(Self( - res?.0 - .into_single() - .map_err(actix_web::error::ErrorBadRequest)?, - )) - }) - } -} - -/// Extractor for GraphQL batch request. -/// -/// `async_graphql::http::MultipartOptions` allows to configure extraction process. -pub struct GraphQLBatchRequest(pub async_graphql::BatchRequest); - -impl GraphQLBatchRequest { - /// Unwraps the value to `async_graphql::BatchRequest`. - #[must_use] - pub fn into_inner(self) -> async_graphql::BatchRequest { - self.0 - } -} - -impl FromRequest for GraphQLBatchRequest { - type Error = Error; - type Future = Pin>>>; - - fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - let config = req - .app_data::() - .cloned() - .unwrap_or_default(); - - if req.method() == Method::GET { - let res = serde_urlencoded::from_str(req.query_string()); - Box::pin(async move { Ok(Self(async_graphql::BatchRequest::Single(res?))) }) - } else if req.method() == Method::POST { - let content_type = req - .headers() - .get(http::header::CONTENT_TYPE) - .and_then(|value| value.to_str().ok()) - .map(|value| value.to_string()); - - let (tx, rx) = async_channel::bounded(16); - - // Payload is !Send so we create indirection with a channel - let mut payload = payload.take(); - actix::spawn(async move { - while let Some(item) = payload.next().await { - if tx.send(item).await.is_err() { - return; - } - } - }); - - Box::pin(async move { - Ok(GraphQLBatchRequest( - async_graphql::http::receive_batch_body( - content_type, - rx.map_err(|e| match e { - PayloadError::Incomplete(Some(e)) | PayloadError::Io(e) => e, - PayloadError::Incomplete(None) => { - io::Error::from(ErrorKind::UnexpectedEof) - } - PayloadError::EncodingCorrupted => io::Error::new( - ErrorKind::InvalidData, - "cannot decode content-encoding", - ), - PayloadError::Overflow => io::Error::new( - ErrorKind::InvalidData, - "a payload reached size limit", - ), - PayloadError::UnknownLength => { - io::Error::new(ErrorKind::Other, "a payload length is unknown") - } - PayloadError::Http2Payload(e) if e.is_io() => e.into_io().unwrap(), - PayloadError::Http2Payload(e) => io::Error::new(ErrorKind::Other, e), - _ => io::Error::new(ErrorKind::Other, e), - }) - .into_async_read(), - config, - ) - .await - .map_err(|err| match err { - ParseRequestError::PayloadTooLarge => { - actix_web::error::ErrorPayloadTooLarge(err) - } - _ => actix_web::error::ErrorBadRequest(err), - })?, - )) - }) - } else { - Box::pin(async move { - Err(actix_web::error::ErrorMethodNotAllowed( - "GraphQL only supports GET and POST requests", - )) - }) - } - } -} - -/// Responder for a GraphQL response. -/// -/// This contains a batch response, but since regular responses are a type of batch response it -/// works for both. -pub struct GraphQLResponse(pub async_graphql::BatchResponse); - -impl From for GraphQLResponse { - fn from(resp: async_graphql::Response) -> Self { - Self(resp.into()) - } -} - -impl From for GraphQLResponse { - fn from(resp: async_graphql::BatchResponse) -> Self { - Self(resp) - } -} - -impl Responder for GraphQLResponse { - fn respond_to(self, _req: &HttpRequest) -> HttpResponse { - let mut res = HttpResponse::build(StatusCode::OK); - res.content_type("application/json"); - if self.0.is_ok() { - if let Some(cache_control) = self.0.cache_control().value() { - res.append_header(("cache-control", cache_control)); - } - } - for (name, value) in self.0.http_headers() { - res.append_header((name, value)); - } - res.body(serde_json::to_string(&self.0).unwrap()) - } -} diff --git a/integrations/actix-web/src/request.rs b/integrations/actix-web/src/request.rs new file mode 100644 index 00000000..de5978e5 --- /dev/null +++ b/integrations/actix-web/src/request.rs @@ -0,0 +1,169 @@ +use std::future::Future; +use std::io::{self, ErrorKind}; +use std::pin::Pin; + +use actix_http::error::PayloadError; +use actix_web::dev::{Payload, PayloadStream}; +use actix_web::http::{Method, StatusCode}; +use actix_web::{http, Error, FromRequest, HttpRequest, HttpResponse, Responder, Result}; +use futures_util::future::{self, FutureExt}; +use futures_util::{StreamExt, TryStreamExt}; + +use async_graphql::http::MultipartOptions; +use async_graphql::ParseRequestError; + +/// Extractor for GraphQL request. +/// +/// `async_graphql::http::MultipartOptions` allows to configure extraction process. +pub struct GraphQLRequest(pub async_graphql::Request); + +impl GraphQLRequest { + /// Unwraps the value to `async_graphql::Request`. + #[must_use] + pub fn into_inner(self) -> async_graphql::Request { + self.0 + } +} + +type BatchToRequestMapper = + fn(<::Future as Future>::Output) -> Result; + +impl FromRequest for GraphQLRequest { + type Error = Error; + type Future = future::Map<::Future, BatchToRequestMapper>; + + fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { + GraphQLBatchRequest::from_request(req, payload).map(|res| { + Ok(Self( + res?.0 + .into_single() + .map_err(actix_web::error::ErrorBadRequest)?, + )) + }) + } +} + +/// Extractor for GraphQL batch request. +/// +/// `async_graphql::http::MultipartOptions` allows to configure extraction process. +pub struct GraphQLBatchRequest(pub async_graphql::BatchRequest); + +impl GraphQLBatchRequest { + /// Unwraps the value to `async_graphql::BatchRequest`. + #[must_use] + pub fn into_inner(self) -> async_graphql::BatchRequest { + self.0 + } +} + +impl FromRequest for GraphQLBatchRequest { + type Error = Error; + type Future = Pin>>>; + + fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { + let config = req + .app_data::() + .cloned() + .unwrap_or_default(); + + if req.method() == Method::GET { + let res = serde_urlencoded::from_str(req.query_string()); + Box::pin(async move { Ok(Self(async_graphql::BatchRequest::Single(res?))) }) + } else if req.method() == Method::POST { + let content_type = req + .headers() + .get(http::header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()) + .map(|value| value.to_string()); + + let (tx, rx) = async_channel::bounded(16); + + // Payload is !Send so we create indirection with a channel + let mut payload = payload.take(); + actix::spawn(async move { + while let Some(item) = payload.next().await { + if tx.send(item).await.is_err() { + return; + } + } + }); + + Box::pin(async move { + Ok(GraphQLBatchRequest( + async_graphql::http::receive_batch_body( + content_type, + rx.map_err(|e| match e { + PayloadError::Incomplete(Some(e)) | PayloadError::Io(e) => e, + PayloadError::Incomplete(None) => { + io::Error::from(ErrorKind::UnexpectedEof) + } + PayloadError::EncodingCorrupted => io::Error::new( + ErrorKind::InvalidData, + "cannot decode content-encoding", + ), + PayloadError::Overflow => io::Error::new( + ErrorKind::InvalidData, + "a payload reached size limit", + ), + PayloadError::UnknownLength => { + io::Error::new(ErrorKind::Other, "a payload length is unknown") + } + PayloadError::Http2Payload(e) if e.is_io() => e.into_io().unwrap(), + PayloadError::Http2Payload(e) => io::Error::new(ErrorKind::Other, e), + _ => io::Error::new(ErrorKind::Other, e), + }) + .into_async_read(), + config, + ) + .await + .map_err(|err| match err { + ParseRequestError::PayloadTooLarge => { + actix_web::error::ErrorPayloadTooLarge(err) + } + _ => actix_web::error::ErrorBadRequest(err), + })?, + )) + }) + } else { + Box::pin(async move { + Err(actix_web::error::ErrorMethodNotAllowed( + "GraphQL only supports GET and POST requests", + )) + }) + } + } +} + +/// Responder for a GraphQL response. +/// +/// This contains a batch response, but since regular responses are a type of batch response it +/// works for both. +pub struct GraphQLResponse(pub async_graphql::BatchResponse); + +impl From for GraphQLResponse { + fn from(resp: async_graphql::Response) -> Self { + Self(resp.into()) + } +} + +impl From for GraphQLResponse { + fn from(resp: async_graphql::BatchResponse) -> Self { + Self(resp) + } +} + +impl Responder for GraphQLResponse { + fn respond_to(self, _req: &HttpRequest) -> HttpResponse { + let mut res = HttpResponse::build(StatusCode::OK); + res.content_type("application/json"); + if self.0.is_ok() { + if let Some(cache_control) = self.0.cache_control().value() { + res.append_header(("cache-control", cache_control)); + } + } + for (name, value) in self.0.http_headers() { + res.append_header((name, value)); + } + res.body(serde_json::to_string(&self.0).unwrap()) + } +} diff --git a/integrations/rocket/src/lib.rs b/integrations/rocket/src/lib.rs index d27b0af5..862fbcc3 100644 --- a/integrations/rocket/src/lib.rs +++ b/integrations/rocket/src/lib.rs @@ -35,25 +35,25 @@ use tokio_util::compat::TokioAsyncReadCompatExt; /// } /// ``` #[derive(Debug)] -pub struct BatchRequest(pub async_graphql::BatchRequest); +pub struct GraphQLBatchRequest(pub async_graphql::BatchRequest); -impl BatchRequest { +impl GraphQLBatchRequest { /// Shortcut method to execute the request on the schema. pub async fn execute( self, schema: &Schema, - ) -> Response + ) -> GraphQLResponse where Query: ObjectType + 'static, Mutation: ObjectType + 'static, Subscription: SubscriptionType + 'static, { - Response(schema.execute_batch(self.0).await) + GraphQLResponse(schema.execute_batch(self.0).await) } } #[rocket::async_trait] -impl<'r> FromData<'r> for BatchRequest { +impl<'r> FromData<'r> for GraphQLBatchRequest { type Error = ParseRequestError; async fn from_data(req: &'r rocket::Request<'_>, data: Data<'r>) -> data::Outcome<'r, Self> { @@ -95,20 +95,20 @@ impl<'r> FromData<'r> for BatchRequest { /// } /// ``` #[derive(Debug)] -pub struct Request(pub async_graphql::Request); +pub struct GraphQLRequest(pub async_graphql::Request); -impl Request { +impl GraphQLRequest { /// Shortcut method to execute the request on the schema. pub async fn execute( self, schema: &Schema, - ) -> Response + ) -> GraphQLResponse where Query: ObjectType + 'static, Mutation: ObjectType + 'static, Subscription: SubscriptionType + 'static, { - Response(schema.execute(self.0).await.into()) + GraphQLResponse(schema.execute(self.0).await.into()) } /// Insert some data for this request. @@ -118,8 +118,8 @@ impl Request { } } -impl From for Request { - fn from(query: Query) -> Self { +impl From for GraphQLRequest { + fn from(query: GraphQLQuery) -> Self { let mut request = async_graphql::Request::new(query.query); if let Some(operation_name) = query.operation_name { @@ -132,7 +132,7 @@ impl From for Request { request = request.variables(variables); } - Request(request) + GraphQLRequest(request) } } @@ -147,35 +147,35 @@ impl From for Request { /// } /// ``` #[derive(FromForm, Debug)] -pub struct Query { +pub struct GraphQLQuery { query: String, #[field(name = "operationName")] operation_name: Option, variables: Option, } -impl Query { +impl GraphQLQuery { /// Shortcut method to execute the request on the schema. pub async fn execute( self, schema: &Schema, - ) -> Response + ) -> GraphQLResponse where Query: ObjectType + 'static, Mutation: ObjectType + 'static, Subscription: SubscriptionType + 'static, { - let request: Request = self.into(); + let request: GraphQLRequest = self.into(); request.execute(schema).await } } #[rocket::async_trait] -impl<'r> FromData<'r> for Request { +impl<'r> FromData<'r> for GraphQLRequest { type Error = ParseRequestError; async fn from_data(req: &'r rocket::Request<'_>, data: Data<'r>) -> data::Outcome<'r, Self> { - BatchRequest::from_data(req, data) + GraphQLBatchRequest::from_data(req, data) .await .and_then(|request| match request.0.into_single() { Ok(single) => data::Outcome::Success(Self(single)), @@ -190,20 +190,20 @@ impl<'r> FromData<'r> for Request { /// It contains a `BatchResponse` but since a response is a type of batch response it works for /// both. #[derive(Debug)] -pub struct Response(pub async_graphql::BatchResponse); +pub struct GraphQLResponse(pub async_graphql::BatchResponse); -impl From for Response { +impl From for GraphQLResponse { fn from(batch: async_graphql::BatchResponse) -> Self { Self(batch) } } -impl From for Response { +impl From for GraphQLResponse { fn from(res: async_graphql::Response) -> Self { Self(res.into()) } } -impl<'r> Responder<'r, 'static> for Response { +impl<'r> Responder<'r, 'static> for GraphQLResponse { fn respond_to(self, _: &'r rocket::Request<'_>) -> response::Result<'static> { let body = serde_json::to_string(&self.0).unwrap();