Add GQLHttpRequest and IntoQueryBuilder trait

This commit is contained in:
sunli 2020-04-11 17:36:05 +08:00
parent cf9b6a5a41
commit 0b6bfd4c33
24 changed files with 832 additions and 297 deletions

View File

@ -36,6 +36,10 @@ chrono = "0.4.10"
slab = "0.4.2"
once_cell = "1.3.1"
itertools = "0.9.0"
tempdir = "0.3.7"
httparse = "1.3.4"
mime = "0.3.16"
http = "0.2.1"
regex = { version = "1.3.5", optional = true }
bson = { version = "0.14.1", optional = true }
uuid = { version = "0.8.1", optional = true }
@ -44,10 +48,9 @@ chrono-tz = { version = "0.5.1", optional = true }
[dev-dependencies]
async-std = { version = "1.5.0", features = ["attributes"] }
actix-web = "2.0.0"
actix-rt = "1.0.0"
tide = "0.6.0"
mime = "0.3.16"
actix-web = "2.0.0"
[workspace]
members = [

View File

@ -29,12 +29,12 @@ It supports all of the GraphQL specifications and is easy to integrate into exis
* [Docs](https://docs.rs/async-graphql)
* [GitHub repository](https://github.com/sunli829/async-graphql)
* [Cargo package](https://crates.io/crates/async-graphql)
* Minimum supported Rust version: 1.40 or later
* Minimum supported Rust version: 1.42 or later
## Example
```shell script
cargo run --example actix-web
cargo run --example actix_web
```
Open `http://localhost:8000` in browser

View File

@ -1,43 +1,83 @@
use actix_web::{web, App, HttpServer};
use async_graphql::{EmptySubscription, Schema, Upload};
use async_graphql::{Context, EmptySubscription, Schema, Upload, ID};
use futures::lock::Mutex;
use slab::Slab;
#[async_graphql::SimpleObject]
#[derive(Clone)]
struct FileInfo {
#[field]
id: ID,
#[field]
filename: String,
#[field]
mimetype: Option<String>,
#[field]
path: String,
}
type Storage = Mutex<Slab<FileInfo>>;
struct QueryRoot;
#[async_graphql::Object]
impl QueryRoot {}
impl QueryRoot {
#[field]
async fn uploads(&self, ctx: &Context<'_>) -> Vec<FileInfo> {
let storage = ctx.data::<Storage>().lock().await;
storage.iter().map(|(_, file)| file).cloned().collect()
}
}
struct MutationRoot;
#[async_graphql::Object]
impl MutationRoot {
#[field]
async fn single_upload(&self, file: Upload) -> bool {
println!(
"upload: filename={} size={}",
file.filename,
file.content.len()
);
true
async fn single_upload(&self, ctx: &Context<'_>, file: Upload) -> FileInfo {
let mut storage = ctx.data::<Storage>().lock().await;
println!("files count: {}", storage.len());
let entry = storage.vacant_entry();
let info = FileInfo {
id: entry.key().into(),
filename: file.filename,
mimetype: file.content_type,
path: file.path.display().to_string(),
};
entry.insert(info.clone());
info
}
#[field]
async fn multiple_upload(&self, files: Vec<Upload>) -> bool {
for upload in files {
println!(
"upload: filename={} size={}",
upload.filename,
upload.content.len()
);
async fn multiple_upload(&self, ctx: &Context<'_>, files: Vec<Upload>) -> Vec<FileInfo> {
let mut infos = Vec::new();
let mut storage = ctx.data::<Storage>().lock().await;
for file in files {
let entry = storage.vacant_entry();
let info = FileInfo {
id: entry.key().into(),
filename: file.filename,
mimetype: file.content_type,
path: file.path.display().to_string(),
};
entry.insert(info.clone());
infos.push(info)
}
true
infos
}
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
let schema = Schema::build(QueryRoot, MutationRoot, EmptySubscription)
.data(Storage::default())
.finish();
HttpServer::new(move || {
let schema = Schema::new(QueryRoot, MutationRoot, EmptySubscription);
let handler = async_graphql_actix_web::HandlerBuilder::new(schema)
let handler = async_graphql_actix_web::HandlerBuilder::new(schema.clone())
.enable_subscription()
.build();
App::new().service(web::resource("/").to(handler))

View File

@ -1,22 +1,14 @@
//! Integrate `async-graphql` to `actix-web`
#![warn(missing_docs)]
mod request;
mod session;
use crate::request::RequestWrapper;
use crate::session::WsSession;
use actix_multipart::Multipart;
use actix_web::http::{header, HeaderMap, Method};
use actix_web::web::{BytesMut, Payload};
use actix_web::{web, FromRequest, HttpRequest, HttpResponse, Responder};
use actix_web::http::{header, Method};
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use actix_web_actors::ws;
use async_graphql::http::{GQLRequest, GQLResponse};
use async_graphql::http::{playground_source, GQLHttpRequest, GQLResponse, IntoQueryBuilder};
use async_graphql::{ObjectType, QueryBuilder, Schema, SubscriptionType};
use bytes::Bytes;
use futures::StreamExt;
use mime::Mime;
use std::collections::HashMap;
use std::future::Future;
use futures::Future;
use std::pin::Pin;
use std::sync::Arc;
@ -27,11 +19,8 @@ type BoxOnRequestFn<Query, Mutation, Subscription> = Arc<
) -> QueryBuilder<Query, Mutation, Subscription>,
>;
/// Actix-web handler builder
pub struct HandlerBuilder<Query, Mutation, Subscription> {
schema: Schema<Query, Mutation, Subscription>,
max_file_size: usize,
max_file_count: usize,
enable_subscription: bool,
enable_ui: Option<(String, Option<String>)>,
on_request: Option<BoxOnRequestFn<Query, Mutation, Subscription>>,
@ -47,30 +36,12 @@ where
pub fn new(schema: Schema<Query, Mutation, Subscription>) -> Self {
Self {
schema,
max_file_size: 1024 * 1024 * 2,
max_file_count: 9,
enable_subscription: false,
enable_ui: None,
on_request: None,
}
}
/// Set the maximum file size for upload, default 2M bytes.
pub fn max_file_size(self, size: usize) -> Self {
Self {
max_file_size: size,
..self
}
}
/// Set the maximum files count for upload, default 9.
pub fn max_files(self, count: usize) -> Self {
Self {
max_file_count: count,
..self
}
}
/// Enable GraphQL playground
///
/// 'endpoint' is the endpoint of the GraphQL Request.
@ -115,18 +86,16 @@ where
self,
) -> impl Fn(
HttpRequest,
Payload,
web::Payload,
) -> Pin<Box<dyn Future<Output = actix_web::Result<HttpResponse>>>>
+ Clone
+ 'static {
let schema = self.schema.clone();
let max_file_size = self.max_file_size;
let max_file_count = self.max_file_count;
let enable_ui = self.enable_ui;
let enable_subscription = self.enable_subscription;
let on_request = self.on_request;
move |req: HttpRequest, payload: Payload| {
move |req: HttpRequest, payload: web::Payload| {
let schema = schema.clone();
let enable_ui = enable_ui.clone();
let on_request = on_request.clone();
@ -151,7 +120,7 @@ where
if let Some((endpoint, subscription_endpoint)) = &enable_ui {
return Ok(HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body(async_graphql::http::playground_source(
.body(playground_source(
endpoint,
subscription_endpoint.as_deref(),
)));
@ -159,15 +128,35 @@ where
}
if req.method() == Method::POST {
handle_request(
&schema,
max_file_size,
max_file_count,
req,
payload,
on_request.as_ref(),
)
.await
let content_type = req
.headers()
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string());
let r = RequestWrapper(content_type, payload);
r.content_type();
let mut builder = match IntoQueryBuilder::into_query_builder(r, &schema).await {
Ok(builder) => builder,
Err(err) => {
return Ok(web::Json(GQLResponse(Err(err))).respond_to(&req).await?)
}
};
let cache_control = builder.cache_control().value();
if let Some(on_request) = &on_request {
builder = on_request(&req, builder);
}
let mut resp = web::Json(GQLResponse(builder.execute().await))
.respond_to(&req)
.await?;
if let Some(cache_control) = cache_control {
resp.headers_mut().insert(
header::CACHE_CONTROL,
header::HeaderValue::from_str(&cache_control).unwrap(),
);
}
Ok(resp)
} else {
Ok(HttpResponse::MethodNotAllowed().finish())
}
@ -175,182 +164,3 @@ where
}
}
}
async fn handle_request<Query, Mutation, Subscription>(
schema: &Schema<Query, Mutation, Subscription>,
max_file_size: usize,
max_file_count: usize,
req: HttpRequest,
mut payload: Payload,
on_request: Option<&BoxOnRequestFn<Query, Mutation, Subscription>>,
) -> actix_web::Result<HttpResponse>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
if let Ok(ct) = get_content_type(req.headers()) {
if ct.essence_str() == mime::MULTIPART_FORM_DATA {
let mut multipart = Multipart::from_request(&req, &mut payload.0).await?;
// read operators
let gql_request = {
let data = read_multipart(&mut multipart, "operations").await?;
serde_json::from_slice::<GQLRequest>(&data)
.map_err(actix_web::error::ErrorBadRequest)?
};
// read map
let mut map = {
let data = read_multipart(&mut multipart, "map").await?;
serde_json::from_slice::<HashMap<String, Vec<String>>>(&data)
.map_err(actix_web::error::ErrorBadRequest)?
};
let mut builder = match gql_request.into_query_builder(schema) {
Ok(builder) => builder,
Err(err) => return Ok(web::Json(GQLResponse(Err(err))).respond_to(&req).await?),
};
if let Some(on_request) = on_request {
builder = on_request(&req, builder);
}
if !builder.is_upload() {
return Err(actix_web::error::ErrorBadRequest(
"It's not an upload operation",
));
}
// read files
let mut file_count = 0;
while let Some(field) = multipart.next().await {
let mut field = field?;
if let Some(content_disposition) = field.content_disposition() {
if let (Some(name), Some(filename)) = (
content_disposition.get_name(),
content_disposition.get_filename(),
) {
if let Some(var_paths) = map.remove(name) {
let content_type = field.content_type().to_string();
let mut data = BytesMut::new();
while let Some(part) = field.next().await {
let part = part.map_err(actix_web::error::ErrorBadRequest)?;
data.extend(&part);
if data.len() > max_file_size {
return Err(actix_web::error::ErrorPayloadTooLarge(
"payload too large",
));
}
}
let data = data.freeze();
for var_path in var_paths {
builder.set_upload(
&var_path,
filename,
Some(&content_type),
data.clone(),
);
}
file_count += 1;
if file_count > max_file_count {
return Err(actix_web::error::ErrorPayloadTooLarge(
"payload too large",
));
}
} else {
return Err(actix_web::error::ErrorBadRequest("bad request"));
}
} else {
return Err(actix_web::error::ErrorBadRequest("bad request"));
}
} else {
return Err(actix_web::error::ErrorBadRequest("bad request"));
}
}
if !map.is_empty() {
return Err(actix_web::error::ErrorBadRequest("missing files"));
}
Ok(web::Json(GQLResponse(builder.execute().await))
.respond_to(&req)
.await?)
} else if ct.essence_str() == mime::APPLICATION_JSON {
let gql_request = web::Json::<GQLRequest>::from_request(&req, &mut payload.0)
.await?
.into_inner();
let mut builder = match gql_request.into_query_builder(schema) {
Ok(builder) => builder,
Err(err) => return Ok(web::Json(GQLResponse(Err(err))).respond_to(&req).await?),
};
if let Some(on_request) = on_request {
builder = on_request(&req, builder);
}
let mut cache_control = builder.cache_control().value();
let gql_resp = builder.execute().await;
if gql_resp.is_err() {
cache_control = None;
}
let mut resp = web::Json(GQLResponse(gql_resp)).respond_to(&req).await?;
if let Some(cache_control) = cache_control {
resp.headers_mut().insert(
header::CACHE_CONTROL,
header::HeaderValue::from_str(&cache_control).unwrap(),
);
}
Ok(resp)
} else {
Ok(HttpResponse::UnsupportedMediaType().finish())
}
} else {
Ok(HttpResponse::UnsupportedMediaType().finish())
}
}
fn get_content_type(headers: &HeaderMap) -> actix_web::Result<Mime> {
if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<Mime>() {
return Ok(ct);
}
}
}
Err(actix_web::error::ErrorUnsupportedMediaType(
"unsupported media type",
))
}
async fn read_multipart(multipart: &mut Multipart, name: &str) -> actix_web::Result<Bytes> {
let data = match multipart.next().await {
Some(Ok(mut field)) => {
if let Some(content_disposition) = field.content_disposition() {
if let Some(current_name) = content_disposition.get_name() {
if current_name != name {
return Err(actix_web::error::ErrorBadRequest(format!(
"expect \"{}\"",
name
)));
}
let mut data = BytesMut::new();
while let Some(part) = field.next().await {
let part = part.map_err(actix_web::error::ErrorBadRequest)?;
data.extend(&part);
}
data
} else {
return Err(actix_web::error::ErrorBadRequest("missing \"operations\""));
}
} else {
return Err(actix_web::error::ErrorBadRequest("bad request"));
}
}
Some(Err(err)) => return Err(err.into()),
None => return Err(actix_web::error::ErrorBadRequest("bad request")),
};
Ok(data.freeze())
}

View File

@ -0,0 +1,65 @@
use actix_web::web::Payload;
use async_graphql::http::GQLHttpRequest;
use bytes::{Buf, Bytes};
use futures::io::{Error, ErrorKind, Result};
use futures::task::{Context, Poll};
use futures::{AsyncRead, StreamExt};
use std::pin::Pin;
pub struct RequestWrapper(pub Option<String>, pub Payload);
unsafe impl Send for RequestWrapper {}
unsafe impl Sync for RequestWrapper {}
impl GQLHttpRequest for RequestWrapper {
type Body = PayloadReader;
fn content_type(&self) -> Option<&str> {
self.0.as_deref()
}
fn into_body(self) -> Self::Body {
PayloadReader {
payload: self.1,
remain_bytes: None,
}
}
}
pub struct PayloadReader {
payload: Payload,
remain_bytes: Option<Bytes>,
}
unsafe impl Send for PayloadReader {}
unsafe impl Sync for PayloadReader {}
impl AsyncRead for PayloadReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
loop {
if let Some(bytes) = &mut self.remain_bytes {
let data = bytes.split_to(buf.len().min(bytes.len()));
buf[..data.len()].copy_from_slice(&data);
if !bytes.has_remaining() {
self.remain_bytes = None;
}
return Poll::Ready(Ok(data.len()));
} else {
match self.payload.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(bytes))) => {
self.remain_bytes = Some(bytes);
}
Poll::Ready(Some(Err(_))) => {
return Poll::Ready(Err(Error::from(ErrorKind::InvalidData)))
}
Poll::Ready(None) => return Poll::Ready(Ok(0)),
Poll::Pending => return Poll::Pending,
}
}
}
}
}

View File

@ -1,7 +1,9 @@
mod starwars;
use actix_web::{guard, web, App, HttpResponse, HttpServer};
use async_graphql::http::{graphiql_source, playground_source, GQLRequest, GQLResponse};
use async_graphql::http::{
graphiql_source, playground_source, GQLRequest, GQLResponse, IntoQueryBuilder,
};
use async_graphql::{EmptyMutation, EmptySubscription, Schema};
use futures::TryFutureExt;
@ -9,7 +11,8 @@ type StarWarsSchema = Schema<starwars::QueryRoot, EmptyMutation, EmptySubscripti
async fn index(s: web::Data<StarWarsSchema>, req: web::Json<GQLRequest>) -> web::Json<GQLResponse> {
web::Json(GQLResponse(
futures::future::ready(req.into_inner().into_query_builder(&s))
req.into_inner()
.into_query_builder(&s)
.and_then(|builder| builder.execute())
.await,
))

View File

@ -3,7 +3,9 @@ extern crate thiserror;
use actix_rt;
use actix_web::{guard, web, App, HttpResponse, HttpServer};
use async_graphql::http::{graphiql_source, playground_source, GQLRequest, GQLResponse};
use async_graphql::http::{
graphiql_source, playground_source, GQLRequest, GQLResponse, IntoQueryBuilder,
};
use async_graphql::ErrorExtensions;
use async_graphql::*;
use futures::TryFutureExt;
@ -110,7 +112,8 @@ async fn index(
req: web::Json<GQLRequest>,
) -> web::Json<GQLResponse> {
web::Json(GQLResponse(
futures::future::ready(req.into_inner().into_query_builder(&s))
req.into_inner()
.into_query_builder(&s)
.and_then(|builder| builder.execute())
.await,
))

View File

@ -1,5 +1,5 @@
use actix_web::{guard, web, App, HttpResponse, HttpServer};
use async_graphql::http::{playground_source, GQLRequest, GQLResponse};
use async_graphql::http::{playground_source, GQLRequest, GQLResponse, IntoQueryBuilder};
use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema, SimpleObject, ID};
use futures::TryFutureExt;
@ -39,7 +39,8 @@ impl Query {
async fn index(s: web::Data<MySchema>, req: web::Json<GQLRequest>) -> web::Json<GQLResponse> {
web::Json(GQLResponse(
futures::future::ready(req.into_inner().into_query_builder(&s))
req.into_inner()
.into_query_builder(&s)
.and_then(|builder| builder.execute())
.await,
))

View File

@ -1,5 +1,5 @@
use actix_web::{guard, web, App, HttpResponse, HttpServer};
use async_graphql::http::{playground_source, GQLRequest, GQLResponse};
use async_graphql::http::{playground_source, GQLRequest, GQLResponse, IntoQueryBuilder};
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
use futures::TryFutureExt;
@ -39,7 +39,8 @@ impl Query {
async fn index(s: web::Data<MySchema>, req: web::Json<GQLRequest>) -> web::Json<GQLResponse> {
web::Json(GQLResponse(
futures::future::ready(req.into_inner().into_query_builder(&s))
req.into_inner()
.into_query_builder(&s)
.and_then(|builder| builder.execute())
.await,
))

View File

@ -1,5 +1,5 @@
use actix_web::{guard, web, App, HttpResponse, HttpServer};
use async_graphql::http::{playground_source, GQLRequest, GQLResponse};
use async_graphql::http::{playground_source, GQLRequest, GQLResponse, IntoQueryBuilder};
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject, ID};
use futures::TryFutureExt;
@ -76,7 +76,8 @@ impl Query {
async fn index(s: web::Data<MySchema>, req: web::Json<GQLRequest>) -> web::Json<GQLResponse> {
web::Json(GQLResponse(
futures::future::ready(req.into_inner().into_query_builder(&s))
req.into_inner()
.into_query_builder(&s)
.and_then(|builder| builder.execute())
.await,
))

View File

@ -1,6 +1,8 @@
mod starwars;
use async_graphql::http::{graphiql_source, playground_source, GQLRequest, GQLResponse};
use async_graphql::http::{
graphiql_source, playground_source, GQLRequest, GQLResponse, IntoQueryBuilder,
};
use async_graphql::{EmptyMutation, EmptySubscription, Schema};
use futures::TryFutureExt;
use mime;
@ -11,7 +13,8 @@ type StarWarsSchema = Schema<starwars::QueryRoot, EmptyMutation, EmptySubscripti
async fn index(mut request: Request<StarWarsSchema>) -> Response {
let gql_request: GQLRequest = request.body_json().await.unwrap();
let schema = request.state();
let gql_response = futures::future::ready(gql_request.into_query_builder(schema))
let gql_response = gql_request
.into_query_builder(schema)
.and_then(|builder| builder.execute())
.await;
Response::new(200)

View File

@ -1,13 +1,13 @@
use crate::extensions::BoxExtension;
use crate::registry::Registry;
use crate::{InputValueType, Pos, QueryError, Result, Type};
use bytes::Bytes;
use graphql_parser::query::{
Directive, Field, FragmentDefinition, SelectionSet, Value, VariableDefinition,
};
use std::any::{Any, TypeId};
use std::collections::{BTreeMap, HashMap};
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::sync::atomic::AtomicUsize;
/// Variables of query
@ -58,7 +58,7 @@ impl Variables {
var_path: &str,
filename: &str,
content_type: Option<&str>,
content: Bytes,
path: &Path,
) {
let mut it = var_path.split('.').peekable();
@ -76,7 +76,7 @@ impl Variables {
if let Value::List(ls) = current {
if let Some(value) = ls.get_mut(idx as usize) {
if !has_next {
*value = Value::String(file_string(filename, content_type, &content));
*value = Value::String(file_string(filename, content_type, path));
return;
} else {
current = value;
@ -88,7 +88,7 @@ impl Variables {
} else if let Value::Object(obj) = current {
if let Some(value) = obj.get_mut(s) {
if !has_next {
*value = Value::String(file_string(filename, content_type, &content));
*value = Value::String(file_string(filename, content_type, path));
return;
} else {
current = value;
@ -101,11 +101,11 @@ impl Variables {
}
}
fn file_string(filename: &str, content_type: Option<&str>, content: &[u8]) -> String {
fn file_string(filename: &str, content_type: Option<&str>, path: &Path) -> String {
if let Some(content_type) = content_type {
format!("file:{}:{}|", filename, content_type) + &base64::encode(content)
format!("file:{}:{}|", filename, content_type) + &path.display().to_string()
} else {
format!("file:{}|", filename) + &base64::encode(content)
format!("file:{}|", filename) + &path.display().to_string()
}
}

View File

@ -303,6 +303,37 @@ impl From<ParseError> for Error {
}
}
#[allow(missing_docs)]
#[derive(Debug, Error)]
pub enum RequestError {
#[error("{0}")]
Io(std::io::Error),
#[error("Invalid request: {0}")]
InvalidRequest(serde_json::Error),
#[error("Invalid files map: {0}")]
InvalidFilesMap(serde_json::Error),
#[error("Invalid multipart data: {0}")]
InvalidMultipart(std::io::Error),
#[error("Missing \"operators\" part")]
MissingOperatorsPart,
#[error("Missing \"map\" part")]
MissingMapPart,
#[error("Failed to read part data: {0}")]
PartData(#[from] std::io::Error),
#[error("It's not an upload operation")]
NotUpload,
#[error("Missing files")]
MissingFiles,
}
#[allow(missing_docs)]
#[derive(Debug, Error)]
pub enum Error {
@ -322,4 +353,7 @@ pub enum Error {
#[error("Rule error")]
Rule { errors: Vec<RuleError> },
#[error("Request error")]
Request(#[from] RequestError),
}

View File

@ -62,9 +62,8 @@ impl Default for Inner {
}
}
/// Apollo tracing
/// Apollo tracing extension for performance tracing
///
/// Apollo Tracing is a GraphQL extension for performance tracing.
/// Apollo Tracing works by including data in the extensions field of the GraphQL response, which is reserved by the GraphQL spec for extra information that a server wants to return. That way, you have access to performance traces alongside the data returned by your query.
/// Its already supported by `Apollo Engine`, and were excited to see what other kinds of integrations people can build on top of this format.
#[derive(Default)]

110
src/http/http_request.rs Normal file
View File

@ -0,0 +1,110 @@
use crate::error::RequestError;
use crate::http::multipart::{Multipart, PartData};
use crate::http::{GQLRequest, IntoQueryBuilder};
use crate::{Error, ObjectType, QueryBuilder, Result, Schema, SubscriptionType};
use futures::{AsyncRead, AsyncReadExt};
use mime::Mime;
use std::collections::HashMap;
/// Http request trait for GraphQL
#[allow(missing_docs)]
pub trait GQLHttpRequest {
type Body: AsyncRead + Send + Unpin;
fn content_type(&self) -> Option<&str>;
fn into_body(self) -> Self::Body;
}
#[async_trait::async_trait]
impl<R: GQLHttpRequest + Send + Sync> IntoQueryBuilder for R {
async fn into_query_builder<Query, Mutation, Subscription>(
mut self,
schema: &Schema<Query, Mutation, Subscription>,
) -> Result<QueryBuilder<Query, Mutation, Subscription>>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
if let Some(boundary) = self
.content_type()
.and_then(|value| value.parse::<Mime>().ok())
.and_then(|ct| {
if ct.essence_str() == mime::MULTIPART_FORM_DATA {
ct.get_param("boundary")
.map(|boundary| boundary.to_string())
} else {
None
}
})
{
// multipart
let mut multipart = Multipart::parse(self.into_body(), boundary.as_str())
.await
.map_err(RequestError::InvalidMultipart)?;
let gql_request: GQLRequest = {
let part = multipart
.remove("operations")
.ok_or_else(|| Error::Request(RequestError::MissingOperatorsPart))?;
let reader = part
.create_reader()
.map_err(|err| Error::Request(RequestError::PartData(err)))?;
serde_json::from_reader(reader).map_err(RequestError::InvalidRequest)?
};
let mut map: HashMap<String, Vec<String>> = {
let part = multipart
.remove("map")
.ok_or_else(|| Error::Request(RequestError::MissingMapPart))?;
let reader = part
.create_reader()
.map_err(|err| Error::Request(RequestError::PartData(err)))?;
serde_json::from_reader(reader).map_err(RequestError::InvalidFilesMap)?
};
let mut builder = gql_request.into_query_builder(schema).await?;
if !builder.is_upload() {
return Err(RequestError::NotUpload.into());
}
// read files
for part in &multipart.parts {
if let Some(name) = &part.name {
if let Some(var_paths) = map.remove(name) {
for var_path in var_paths {
if let (Some(filename), PartData::File(path)) =
(&part.filename, &part.data)
{
builder.set_upload(
&var_path,
&filename,
part.content_type.as_deref(),
path,
);
}
}
}
}
}
if !map.is_empty() {
return Err(RequestError::MissingFiles.into());
}
if let Some(temp_dir) = multipart.temp_dir {
builder.set_files_holder(temp_dir);
}
Ok(builder)
} else {
let mut data = Vec::new();
self.into_body()
.read_to_end(&mut data)
.await
.map_err(RequestError::Io)?;
let gql_request: GQLRequest =
serde_json::from_slice(&data).map_err(RequestError::InvalidRequest)?;
gql_request.into_query_builder(schema).await
}
}
}

View File

@ -1,11 +1,15 @@
//! A helper module that supports HTTP
mod graphiql_source;
mod http_request;
mod multipart;
mod playground_source;
mod token_reader;
use itertools::Itertools;
pub use graphiql_source::graphiql_source;
pub use http_request::GQLHttpRequest;
pub use playground_source::playground_source;
use crate::{
@ -16,7 +20,20 @@ use graphql_parser::Pos;
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Serialize, Serializer};
/// GraphQL Request object
#[allow(missing_docs)]
#[async_trait::async_trait]
pub trait IntoQueryBuilder {
async fn into_query_builder<Query, Mutation, Subscription>(
self,
schema: &Schema<Query, Mutation, Subscription>,
) -> Result<QueryBuilder<Query, Mutation, Subscription>>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static;
}
/// Deserializable GraphQL Request object
#[derive(Deserialize, Clone, PartialEq, Debug)]
pub struct GQLRequest {
/// Query source
@ -30,9 +47,9 @@ pub struct GQLRequest {
pub variables: Option<serde_json::Value>,
}
impl GQLRequest {
/// Into query builder, you can set other parameters or execute queries immediately.
pub fn into_query_builder<Query, Mutation, Subscription>(
#[async_trait::async_trait]
impl IntoQueryBuilder for GQLRequest {
async fn into_query_builder<Query, Mutation, Subscription>(
self,
schema: &Schema<Query, Mutation, Subscription>,
) -> Result<QueryBuilder<Query, Mutation, Subscription>>
@ -54,7 +71,7 @@ impl GQLRequest {
}
}
/// Serializable query result type
/// Serializable GraphQL Response object
pub struct GQLResponse(pub Result<QueryResponse>);
impl Serialize for GQLResponse {
@ -143,6 +160,13 @@ impl<'a> Serialize for GQLError<'a> {
}
seq.end()
}
Error::Request(err) => {
let mut seq = serializer.serialize_seq(Some(1))?;
seq.serialize_element(&serde_json::json!({
"message": err.to_string(),
}))?;
seq.end()
}
}
}
}

240
src/http/multipart.rs Normal file
View File

@ -0,0 +1,240 @@
use super::token_reader::*;
use futures::io::{BufReader, ErrorKind};
use futures::{AsyncBufRead, AsyncRead};
use http::{header::HeaderName, HeaderMap, HeaderValue};
use itertools::Itertools;
use std::fs::File;
use std::io::{Cursor, Error, Read, Result, Write};
use std::path::PathBuf;
use std::str::FromStr;
use tempdir::TempDir;
const MAX_HEADERS: usize = 16;
pub enum PartData {
Bytes(Vec<u8>),
File(PathBuf),
}
pub struct Part {
pub name: Option<String>,
pub filename: Option<String>,
pub content_type: Option<String>,
pub size: usize,
pub data: PartData,
}
impl Part {
pub fn create_reader<'a>(&'a self) -> Result<Box<dyn Read + 'a>> {
let reader: Box<dyn Read> = match &self.data {
PartData::Bytes(bytes) => Box::new(Cursor::new(bytes)),
PartData::File(path) => Box::new(File::open(path)?),
};
Ok(reader)
}
}
struct ContentDisposition {
name: Option<String>,
filename: Option<String>,
}
impl ContentDisposition {
fn parse(value: &str) -> Result<ContentDisposition> {
let name = regex::Regex::new("name=\"(?P<name>.*?)\"")
.unwrap()
.captures(value)
.and_then(|caps| caps.name("name").map(|m| m.as_str().to_string()));
let filename = regex::Regex::new("filename=\"(?P<filename>.*?)\"")
.unwrap()
.captures(value)
.and_then(|caps| caps.name("filename").map(|m| m.as_str().to_string()));
Ok(ContentDisposition { name, filename })
}
}
pub struct Multipart {
pub temp_dir: Option<TempDir>,
pub parts: Vec<Part>,
}
impl Multipart {
pub async fn parse<R: AsyncRead + Unpin>(reader: R, boundary: &str) -> Result<Multipart> {
let mut reader = BufReader::new(reader);
let mut temp_dir = None;
let mut parts = Vec::new();
let boundary = format!("--{}", boundary);
// first part
reader.except_token(boundary.as_bytes()).await?;
reader.except_token(b"\r\n").await?;
let headers = Self::parse_headers(&mut reader).await?;
parts.push(Self::parse_body(&mut reader, &headers, &mut temp_dir, &boundary).await?);
// next parts
loop {
if reader.except_token(b"\r\n").await.is_err() {
reader.except_token(b"--\r\n").await?;
break;
}
let headers = Self::parse_headers(&mut reader).await?;
parts.push(Self::parse_body(&mut reader, &headers, &mut temp_dir, &boundary).await?);
}
Ok(Multipart { temp_dir, parts })
}
async fn parse_headers<R: AsyncBufRead + Unpin>(mut reader: R) -> Result<HeaderMap> {
let mut buf = [0; 256];
let mut header_data = Vec::new();
let mut state = ReadUntilState::default();
loop {
let (size, found) = reader
.read_until_token(b"\r\n\r\n", &mut buf, &mut state)
.await?;
header_data.extend_from_slice(&buf[..size]);
if found {
break;
}
}
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
header_data.extend_from_slice(b"\r\n\r\n");
let headers = match httparse::parse_headers(&header_data, &mut headers)
.map_err(|_| Error::from(ErrorKind::InvalidData))?
{
httparse::Status::Complete((_, headers)) => headers,
_ => return Err(Error::from(ErrorKind::InvalidData)),
};
let mut headers_map = HeaderMap::new();
for httparse::Header { name, value } in headers {
headers_map.insert(
HeaderName::from_str(name).map_err(|_| Error::from(ErrorKind::InvalidData))?,
HeaderValue::from_bytes(value).map_err(|_| Error::from(ErrorKind::InvalidData))?,
);
}
Ok(headers_map)
}
async fn parse_body<R: AsyncBufRead + Unpin>(
mut reader: R,
headers: &HeaderMap,
temp_dir: &mut Option<TempDir>,
boundary: &str,
) -> Result<Part> {
let content_disposition = headers
.get(http::header::CONTENT_DISPOSITION)
.and_then(|value| value.to_str().ok())
.and_then(|value| ContentDisposition::parse(value).ok())
.unwrap_or_else(|| ContentDisposition {
name: None,
filename: None,
});
let content_type = headers
.get(http::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.map(ToString::to_string);
let mut buf = [0; 4096];
let mut state = ReadUntilState::default();
let mut total_size = 0;
let part_data = if let Some(filename) = &content_disposition.filename {
if temp_dir.is_none() {
*temp_dir = Some(TempDir::new("async-graphql")?);
}
let temp_dir = temp_dir.as_mut().unwrap();
let path = temp_dir.path().join(filename);
let mut file = File::create(&path)?;
loop {
let (size, found) = reader
.read_until_token(boundary.as_bytes(), &mut buf, &mut state)
.await?;
total_size += size;
file.write_all(&buf[..size])?;
if found {
break;
}
}
PartData::File(path)
} else {
let mut body = Vec::new();
loop {
let (size, found) = reader
.read_until_token(boundary.as_bytes(), &mut buf, &mut state)
.await?;
total_size += size;
body.extend_from_slice(&buf[..size]);
if found {
break;
}
}
PartData::Bytes(body)
};
Ok(Part {
name: content_disposition.name,
filename: content_disposition.filename,
content_type,
size: total_size,
data: part_data,
})
}
pub fn remove(&mut self, name: &str) -> Option<Part> {
if let Some((pos, _)) = self.parts.iter().find_position(|part| {
if let Some(part_name) = &part.name {
part_name == name
} else {
false
}
}) {
Some(self.parts.remove(pos))
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[async_std::test]
async fn test_parse() {
let data: &[u8] = b"--abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
test\r\n\
--abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
data\r\n\
--abbc761f78ff4d7cb7573b5a23f96ef0--\r\n";
let multipart = Multipart::parse(data, "abbc761f78ff4d7cb7573b5a23f96ef0")
.await
.unwrap();
assert_eq!(multipart.parts.len(), 2);
let part_1 = &multipart.parts[0];
assert_eq!(part_1.name.as_deref(), Some("file"));
assert_eq!(part_1.filename.as_deref(), Some("fn.txt"));
assert_eq!(
part_1.content_type.as_deref(),
Some("text/plain; charset=utf-8")
);
let part_2 = &multipart.parts[1];
assert!(part_2.name.is_none());
assert!(part_2.filename.is_none());
assert_eq!(
part_2.content_type.as_deref(),
Some("text/plain; charset=utf-8")
);
}
}

191
src/http/token_reader.rs Normal file
View File

@ -0,0 +1,191 @@
use futures::io::ErrorKind;
use futures::task::{Context, Poll};
use futures::{AsyncBufRead, Future};
use std::io::{Error, Result};
use std::pin::Pin;
pub trait AsyncTokenReader: AsyncBufRead {
fn read_until_token<'a>(
&'a mut self,
token: &'a [u8],
buf: &'a mut [u8],
state: &'a mut ReadUntilState,
) -> ReadUntilToken<'a, Self> {
ReadUntilToken {
reader: self,
token,
buf,
state,
}
}
fn except_token<'a>(&'a mut self, token: &'a [u8]) -> ExceptToken<'a, Self> {
ExceptToken {
reader: self,
token,
match_size: 0,
}
}
}
impl<R: AsyncBufRead> AsyncTokenReader for R {}
#[derive(Default)]
pub struct ReadUntilState {
match_size: usize,
consume_token: Option<(usize, usize)>,
}
pub struct ReadUntilToken<'a, R: ?Sized> {
reader: &'a mut R,
token: &'a [u8],
buf: &'a mut [u8],
state: &'a mut ReadUntilState,
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntilToken<'a, R> {
type Output = Result<(usize, bool)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let mut rsz = 0;
loop {
let nsz = this.buf.len() - rsz;
if let Some((pos, size)) = &mut this.state.consume_token {
let sz = (*size - *pos).min(nsz);
this.buf[rsz..rsz + sz].copy_from_slice(&this.token[*pos..*pos + sz]);
*pos += sz;
rsz += sz;
if *pos == *size {
this.state.consume_token = None;
}
if rsz == this.buf.len() {
return Poll::Ready(Ok((rsz, false)));
}
} else {
match Pin::new(&mut this.reader).poll_fill_buf(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Ready(Ok(data)) if data.is_empty() => {
return Poll::Ready(Err(Error::from(ErrorKind::UnexpectedEof)))
}
Poll::Ready(Ok(data)) => {
let mut consume_size = data.len();
for (idx, b) in data.iter().enumerate() {
if *b == this.token[this.state.match_size] {
this.state.match_size += 1;
if this.state.match_size == this.token.len() {
Pin::new(&mut this.reader).consume(idx + 1);
this.state.match_size = 0;
return Poll::Ready(Ok((rsz, true)));
}
} else if this.state.match_size > 0 {
this.state.consume_token = Some((0, this.state.match_size));
this.state.match_size = 0;
consume_size = idx;
break;
} else {
this.buf[rsz] = *b;
rsz += 1;
if rsz == this.buf.len() {
Pin::new(&mut this.reader).consume(idx + 1);
return Poll::Ready(Ok((rsz, false)));
}
}
}
Pin::new(&mut this.reader).consume(consume_size);
}
}
}
}
}
}
pub struct ExceptToken<'a, R: ?Sized> {
reader: &'a mut R,
token: &'a [u8],
match_size: usize,
}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for ExceptToken<'a, R> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
loop {
match Pin::new(&mut this.reader).poll_fill_buf(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Ready(Ok(data)) if data.is_empty() => {
return Poll::Ready(Err(Error::from(ErrorKind::UnexpectedEof)))
}
Poll::Ready(Ok(data)) => {
for b in data {
if *b == this.token[this.match_size] {
this.match_size += 1;
if this.match_size == this.token.len() {
Pin::new(&mut this.reader).consume(this.match_size);
return Poll::Ready(Ok(()));
}
} else {
return Poll::Ready(Err(Error::from(ErrorKind::InvalidData)));
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use crate::http::token_reader::{AsyncTokenReader, ReadUntilState};
use futures::io::BufReader;
#[async_std::test]
async fn test_read_until_token() {
let data: &[u8] = b"12AB567890ABC12345ABC6";
let mut reader = BufReader::new(data);
let mut buf = [0; 3];
let mut state = ReadUntilState::default();
let res = reader.read_until_token(b"ABC", &mut buf, &mut state).await;
assert!(matches!(res, Ok((3, false))));
assert_eq!(&buf, b"12A");
let res = reader.read_until_token(b"ABC", &mut buf, &mut state).await;
assert!(matches!(res, Ok((3, false))));
assert_eq!(&buf, b"B56");
let res = reader.read_until_token(b"ABC", &mut buf, &mut state).await;
assert!(matches!(res, Ok((3, false))));
assert_eq!(&buf, b"789");
let res = reader.read_until_token(b"ABC", &mut buf, &mut state).await;
assert!(matches!(res, Ok((1, true))));
assert_eq!(&buf[..1], b"0");
let res = reader.read_until_token(b"ABC", &mut buf, &mut state).await;
assert!(matches!(res, Ok((3, false))));
assert_eq!(&buf, b"123");
let res = reader.read_until_token(b"ABC", &mut buf, &mut state).await;
assert!(matches!(res, Ok((2, true))));
assert_eq!(&buf[..2], b"45");
let res = reader.read_until_token(b"ABC", &mut buf, &mut state).await;
assert!(matches!(res, Err(_)));
}
#[async_std::test]
async fn test_read_expect_token() {
let data: &[u8] = b"ABCABC";
let mut reader = BufReader::new(data);
assert!(reader.except_token(b"ABC").await.is_ok());
assert!(reader.except_token(b"ABC").await.is_ok());
assert!(reader.except_token(b"ABC").await.is_err());
}
}

View File

@ -26,7 +26,7 @@
//!
//! * [GitHub repository](https://github.com/sunli829/async-graphql)
//! * [Cargo package](https://crates.io/crates/async-graphql)
//! * Minimum supported Rust version: 1.40 or later
//! * Minimum supported Rust version: 1.42 or later
//!
//! ## Features
//!
@ -46,7 +46,7 @@
//!
//! ## Integrations
//!
//! * Actix-web [async-graphql-actix-web](https://crates.io/crates/async-graphql-actix-web)
//! * Actix-web [async-graphql-actix_web](https://crates.io/crates/async-graphql-actix-web)
//!
//! ## License
//!

View File

@ -4,14 +4,15 @@ use crate::mutation_resolver::do_mutation_resolve;
use crate::registry::CacheControl;
use crate::{do_resolve, ContextBase, Error, Result, Schema};
use crate::{ObjectType, QueryError, Variables};
use bytes::Bytes;
use graphql_parser::query::{
Definition, Document, OperationDefinition, SelectionSet, VariableDefinition,
};
use graphql_parser::Pos;
use std::any::Any;
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::AtomicUsize;
use tempdir::TempDir;
/// Query response
pub struct QueryResponse {
@ -31,6 +32,7 @@ pub struct QueryBuilder<Query, Mutation, Subscription> {
pub(crate) variables: Variables,
pub(crate) ctx_data: Option<Data>,
pub(crate) cache_control: CacheControl,
pub(crate) files_holder: Option<TempDir>,
}
impl<Query, Mutation, Subscription> QueryBuilder<Query, Mutation, Subscription> {
@ -118,16 +120,21 @@ impl<Query, Mutation, Subscription> QueryBuilder<Query, Mutation, Subscription>
false
}
/// Set upload files
/// Set file holder
pub fn set_files_holder(&mut self, files_holder: TempDir) {
self.files_holder = Some(files_holder);
}
/// Set uploaded file path
pub fn set_upload(
&mut self,
var_path: &str,
filename: &str,
content_type: Option<&str>,
content: Bytes,
path: &Path,
) {
self.variables
.set_upload(var_path, filename, content_type, content);
.set_upload(var_path, filename, content_type, path);
}
/// Execute the query.

View File

@ -427,7 +427,7 @@ impl Registry {
if let Some(provides) = field.provides {
write!(sdl, " @provides(fields: \"{}\")", provides).ok();
}
write!(sdl, "\n").ok();
writeln!(sdl).ok();
}
}
@ -460,9 +460,9 @@ impl Registry {
write!(sdl, "@key(fields: \"{}\") ", key).ok();
}
}
write!(sdl, "{{\n").ok();
writeln!(sdl, "{{").ok();
Self::create_federation_fields(sdl, fields.values());
write!(sdl, "}}\n").ok();
writeln!(sdl, "}}").ok();
}
Type::Interface {
name,
@ -480,9 +480,9 @@ impl Registry {
write!(sdl, "@key(fields: \"{}\") ", key).ok();
}
}
write!(sdl, "{{\n").ok();
writeln!(sdl, "{{").ok();
Self::create_federation_fields(sdl, fields.values());
write!(sdl, "}}\n").ok();
writeln!(sdl, "}}").ok();
}
_ => {}
}

View File

@ -41,12 +41,12 @@ fn gql_value_to_json_value(value: &Value) -> serde_json::Value {
Value::Boolean(v) => (*v).into(),
Value::Enum(e) => e.clone().into(),
Value::List(values) => values
.into_iter()
.iter()
.map(|value| gql_value_to_json_value(value))
.collect_vec()
.into(),
Value::Object(obj) => serde_json::Value::Object(
obj.into_iter()
obj.iter()
.map(|(k, v)| (k.clone(), gql_value_to_json_value(v)))
.collect(),
),

View File

@ -264,6 +264,7 @@ where
variables: Default::default(),
ctx_data: None,
cache_control,
files_holder: None,
})
}

View File

@ -1,5 +1,6 @@
use crate::{registry, InputValueType, Type, Value};
use std::borrow::Cow;
use std::path::PathBuf;
/// Uploaded file
///
@ -52,8 +53,8 @@ pub struct Upload {
/// Content type, such as `application/json`, `image/jpg` ...
pub content_type: Option<String>,
/// File content
pub content: Vec<u8>,
/// Temporary file path
pub path: PathBuf,
}
impl<'a> Type for Upload {
@ -80,22 +81,20 @@ impl<'a> InputValueType for Upload {
let s = &s[5..];
if let Some(idx) = s.find('|') {
let name_and_type = &s[..idx];
let content_b64 = &s[idx + 1..];
let path = &s[idx + 1..];
if let Some(type_idx) = name_and_type.find(':') {
let name = &name_and_type[..type_idx];
let mime_type = &name_and_type[type_idx + 1..];
let content = base64::decode(content_b64).ok().unwrap_or_default();
return Some(Self {
filename: name.to_string(),
content_type: Some(mime_type.to_string()),
content,
path: PathBuf::from(path),
});
} else {
let content = base64::decode(content_b64).ok().unwrap_or_default();
return Some(Self {
filename: name_and_type.to_string(),
content_type: None,
content,
path: PathBuf::from(path),
});
}
}