Initial attempt.

This commit is contained in:
Sunli 2020-09-10 12:49:08 +08:00
parent 53a7314666
commit ce0683e1f9
14 changed files with 491 additions and 539 deletions

View File

@ -1,5 +1,5 @@
pub use async_graphql::http::GQLResponse;
use async_graphql::{ObjectType, QueryResponse, Schema, SubscriptionType};
use async_graphql::{GQLQueryResponse, ObjectType, Schema, SubscriptionType};
use async_graphql_parser::{parse_query, types::ExecutableDocument};
use async_std::task;
@ -10,7 +10,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
pub fn run<Query, Mutation, Subscription>(
s: &Schema<Query, Mutation, Subscription>,
q: &str,
) -> QueryResponse
) -> GQLQueryResponse
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,

View File

@ -10,7 +10,7 @@ use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::StreamBody;
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder, QueryResponse,
GQLQueryResponse, IntoQueryBuilder, ParseRequestError, QueryBuilder, ReceiveMultipartOptions,
};
use futures::channel::mpsc;
use futures::future::Ready;
@ -35,7 +35,7 @@ impl GQLRequest {
impl FromRequest for GQLRequest {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<GQLRequest, Error>>>>;
type Config = IntoQueryBuilderOpts;
type Config = ReceiveMultipartOptions;
fn from_request(req: &HttpRequest, payload: &mut Payload<PayloadStream>) -> Self::Future {
let config = req.app_data::<Self::Config>().cloned().unwrap_or_default();
@ -87,10 +87,10 @@ impl FromRequest for GQLRequest {
}
/// Responder for GraphQL response
pub struct GQLResponse(async_graphql::Result<QueryResponse>);
pub struct GQLResponse(async_graphql::Result<GQLQueryResponse>);
impl From<async_graphql::Result<QueryResponse>> for GQLResponse {
fn from(resp: async_graphql::Result<QueryResponse>) -> Self {
impl From<async_graphql::Result<GQLQueryResponse>> for GQLResponse {
fn from(resp: async_graphql::Result<GQLQueryResponse>) -> Self {
GQLResponse(resp)
}
}
@ -111,9 +111,9 @@ impl Responder for GQLResponse {
fn add_cache_control(
builder: &mut HttpResponseBuilder,
resp: &async_graphql::Result<QueryResponse>,
resp: &async_graphql::Result<GQLQueryResponse>,
) {
if let Ok(QueryResponse { cache_control, .. }) = resp {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() {
builder.header("cache-control", cache_control);
}

View File

@ -4,7 +4,7 @@
#![forbid(unsafe_code)]
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, QueryResponse, Schema,
GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Schema,
SubscriptionType, Variables,
};
use log::{error, info};
@ -99,7 +99,7 @@ impl GraphQL {
/// ```
pub fn fairing_with_opts<Q, M, S>(
schema: Schema<Q, M, S>,
opts: IntoQueryBuilderOpts,
opts: ReceiveMultipartOptions,
) -> impl Fairing
where
Q: ObjectType + Send + Sync + 'static,
@ -109,7 +109,7 @@ impl GraphQL {
GraphQL::attach(schema, opts)
}
fn attach<Q, M, S>(schema: Schema<Q, M, S>, opts: IntoQueryBuilderOpts) -> impl Fairing
fn attach<Q, M, S>(schema: Schema<Q, M, S>, opts: ReceiveMultipartOptions) -> impl Fairing
where
Q: ObjectType + Send + Sync + 'static,
M: ObjectType + Send + Sync + 'static,
@ -231,7 +231,7 @@ impl FromData for GQLRequest {
type Error = String;
async fn from_data(req: &Request<'_>, data: Data) -> data::Outcome<Self, Self::Error> {
let opts = match req.guard::<State<'_, Arc<IntoQueryBuilderOpts>>>().await {
let opts = match req.guard::<State<'_, Arc<ReceiveMultipartOptions>>>().await {
Outcome::Success(opts) => opts,
Outcome::Failure(_) => {
return data::Outcome::Failure((
@ -258,7 +258,7 @@ impl FromData for GQLRequest {
/// Wrapper around `async-graphql::query::QueryResponse` for implementing the trait
/// `rocket::response::responder::Responder`, so that `GQLResponse` can directly be returned
/// from a Rocket Route function.
pub struct GQLResponse(pub QueryResponse);
pub struct GQLResponse(pub GQLQueryResponse);
impl<'r> Responder<'r, 'static> for GQLResponse {
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
@ -277,13 +277,13 @@ impl<'r> Responder<'r, 'static> for GQLResponse {
/// Extension trait, to allow the use of `cache_control` with for example `ResponseBuilder`.
pub trait CacheControl {
/// Add the `async-graphql::query::QueryResponse` cache control value as header to the Rocket response.
fn cache_control(&mut self, resp: &async_graphql::Result<QueryResponse>) -> &mut Self;
fn cache_control(&mut self, resp: &async_graphql::Result<GQLQueryResponse>) -> &mut Self;
}
impl<'r> CacheControl for ResponseBuilder<'r> {
fn cache_control(
&mut self,
resp: &async_graphql::Result<QueryResponse>,
resp: &async_graphql::Result<GQLQueryResponse>,
) -> &mut ResponseBuilder<'r> {
match resp {
Ok(resp) if resp.cache_control.value().is_some() => self.header(Header::new(

View File

@ -7,7 +7,7 @@
use async_graphql::http::{GQLRequest, GQLResponse};
use async_graphql::{
IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder, QueryResponse, Schema,
GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Schema,
SubscriptionType,
};
use async_trait::async_trait;
@ -70,7 +70,7 @@ pub async fn graphql_opts<Query, Mutation, Subscription, TideState, F>(
req: Request<TideState>,
schema: Schema<Query, Mutation, Subscription>,
query_builder_configuration: F,
opts: IntoQueryBuilderOpts,
opts: ReceiveMultipartOptions,
) -> tide::Result<Response>
where
Query: ObjectType + Send + Sync + 'static,
@ -97,12 +97,12 @@ pub trait RequestExt<State: Clone + Send + Sync + 'static>: Sized {
}
/// Similar to graphql, but you can set the options `IntoQueryBuilderOpts`.
async fn body_graphql_opts(self, opts: IntoQueryBuilderOpts) -> tide::Result<QueryBuilder>;
async fn body_graphql_opts(self, opts: ReceiveMultipartOptions) -> tide::Result<QueryBuilder>;
}
#[async_trait]
impl<State: Clone + Send + Sync + 'static> RequestExt<State> for Request<State> {
async fn body_graphql_opts(self, opts: IntoQueryBuilderOpts) -> tide::Result<QueryBuilder> {
async fn body_graphql_opts(self, opts: ReceiveMultipartOptions) -> tide::Result<QueryBuilder> {
if self.method() == Method::Get {
let gql_request: GQLRequest = self.query::<GQLRequest>()?;
let builder = gql_request
@ -123,11 +123,11 @@ impl<State: Clone + Send + Sync + 'static> RequestExt<State> for Request<State>
///
pub trait ResponseExt: Sized {
/// Set body as the result of a GraphQL query.
fn body_graphql(self, res: async_graphql::Result<QueryResponse>) -> tide::Result<Self>;
fn body_graphql(self, res: async_graphql::Result<GQLQueryResponse>) -> tide::Result<Self>;
}
impl ResponseExt for Response {
fn body_graphql(self, res: async_graphql::Result<QueryResponse>) -> tide::Result<Self> {
fn body_graphql(self, res: async_graphql::Result<GQLQueryResponse>) -> tide::Result<Self> {
let mut resp = add_cache_control(self, &res);
resp.set_body(Body::from_json(&GQLResponse(res))?);
Ok(resp)
@ -136,9 +136,9 @@ impl ResponseExt for Response {
fn add_cache_control(
mut http_resp: Response,
resp: &async_graphql::Result<QueryResponse>,
resp: &async_graphql::Result<GQLQueryResponse>,
) -> Response {
if let Ok(QueryResponse { cache_control, .. }) = resp {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() {
if let Ok(header) = tide::http::headers::HeaderName::from_str("cache-control") {
http_resp.insert_header(header, cache_control);

View File

@ -7,8 +7,8 @@
use async_graphql::http::{GQLRequest, StreamBody};
use async_graphql::{
Data, FieldResult, IntoQueryBuilder, IntoQueryBuilderOpts, ObjectType, QueryBuilder,
QueryResponse, Schema, SubscriptionType, WebSocketTransport,
Data, FieldResult, GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder,
ReceiveMultipartOptions, Schema, SubscriptionType, WebSocketTransport,
};
use futures::select;
use futures::{SinkExt, StreamExt};
@ -80,7 +80,7 @@ where
/// Similar to graphql, but you can set the options `IntoQueryBuilderOpts`.
pub fn graphql_opts<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
opts: IntoQueryBuilderOpts,
opts: ReceiveMultipartOptions,
) -> BoxedFilter<((Schema<Query, Mutation, Subscription>, QueryBuilder),)>
where
Query: ObjectType + Send + Sync + 'static,
@ -100,12 +100,11 @@ where
query: String,
content_type,
body,
opts: Arc<IntoQueryBuilderOpts>,
opts: Arc<ReceiveMultipartOptions>,
schema| async move {
if method == Method::GET {
let gql_request: GQLRequest =
serde_urlencoded::from_str(&query)
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
let gql_request: GQLRequest = serde_urlencoded::from_str(&query)
.map_err(|err| warp::reject::custom(BadRequest(err.into())))?;
let builder = gql_request
.into_query_builder_opts(&opts)
.await
@ -232,16 +231,16 @@ where
}
/// GraphQL reply
pub struct GQLResponse(async_graphql::Result<QueryResponse>);
pub struct GQLResponse(async_graphql::Result<GQLQueryResponse>);
impl From<async_graphql::Result<QueryResponse>> for GQLResponse {
fn from(resp: async_graphql::Result<QueryResponse>) -> Self {
impl From<async_graphql::Result<GQLQueryResponse>> for GQLResponse {
fn from(resp: async_graphql::Result<GQLQueryResponse>) -> Self {
GQLResponse(resp)
}
}
fn add_cache_control(http_resp: &mut Response, resp: &async_graphql::Result<QueryResponse>) {
if let Ok(QueryResponse { cache_control, .. }) = resp {
fn add_cache_control(http_resp: &mut Response, resp: &async_graphql::Result<GQLQueryResponse>) {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() {
if let Ok(value) = cache_control.parse() {
http_resp.headers_mut().insert("cache-control", value);

View File

@ -1,136 +0,0 @@
use crate::http::GQLRequest;
use crate::query::{IntoQueryBuilder, IntoQueryBuilderOpts};
use crate::{ParseRequestError, QueryBuilder};
use bytes::Bytes;
use futures::{stream, AsyncRead, AsyncReadExt, Stream};
use multer::{Constraints, Multipart, SizeLimit};
use std::collections::HashMap;
use std::io::{self, Seek, SeekFrom, Write};
use std::pin::Pin;
use std::task::Poll;
impl From<multer::Error> for ParseRequestError {
fn from(err: multer::Error) -> Self {
match err {
multer::Error::FieldSizeExceeded { .. } | multer::Error::StreamSizeExceeded { .. } => {
ParseRequestError::PayloadTooLarge
}
_ => ParseRequestError::InvalidMultipart(err),
}
}
}
#[async_trait::async_trait]
impl<CT, Body> IntoQueryBuilder for (Option<CT>, Body)
where
CT: AsRef<str> + Send,
Body: AsyncRead + Send + Unpin + 'static,
{
async fn into_query_builder_opts(
mut self,
opts: &IntoQueryBuilderOpts,
) -> std::result::Result<QueryBuilder, ParseRequestError> {
if let Some(boundary) = self.0.and_then(|ct| multer::parse_boundary(ct).ok()) {
// multipart
let mut multipart = Multipart::new_with_constraints(
reader_stream(self.1),
boundary,
Constraints::new().size_limit({
let mut limit = SizeLimit::new();
if let (Some(max_file_size), Some(max_num_files)) =
(opts.max_file_size, opts.max_file_size)
{
limit = limit.whole_stream((max_file_size * max_num_files) as u64);
}
if let Some(max_file_size) = opts.max_file_size {
limit = limit.per_field(max_file_size as u64);
}
limit
}),
);
let mut builder = None;
let mut map = None;
let mut files = Vec::new();
while let Some(mut field) = multipart.next_field().await? {
match field.name() {
Some("operations") => {
let request_str = field.text().await?;
let request: GQLRequest = serde_json::from_str(&request_str)
.map_err(ParseRequestError::InvalidRequest)?;
builder = Some(request.into_query_builder().await?);
}
Some("map") => {
let map_str = field.text().await?;
map = Some(
serde_json::from_str::<HashMap<String, Vec<String>>>(&map_str)
.map_err(ParseRequestError::InvalidFilesMap)?,
);
}
_ => {
if let Some(name) = field.name().map(ToString::to_string) {
if let Some(filename) = field.file_name().map(ToString::to_string) {
let content_type =
field.content_type().map(|mime| mime.to_string());
let mut file =
tempfile::tempfile().map_err(ParseRequestError::Io)?;
while let Some(chunk) = field.chunk().await.unwrap() {
file.write(&chunk).map_err(ParseRequestError::Io)?;
}
file.seek(SeekFrom::Start(0))?;
files.push((name, filename, content_type, file));
}
}
}
}
}
let mut builder = builder.ok_or(ParseRequestError::MissingOperatorsPart)?;
let map = map.as_mut().ok_or(ParseRequestError::MissingMapPart)?;
for (name, filename, content_type, file) in files {
if let Some(var_paths) = map.remove(&name) {
for var_path in var_paths {
builder.set_upload(
&var_path,
filename.clone(),
content_type.clone(),
file.try_clone().unwrap(),
);
}
}
}
if !map.is_empty() {
return Err(ParseRequestError::MissingFiles);
}
Ok(builder)
} else {
let mut data = Vec::new();
self.1
.read_to_end(&mut data)
.await
.map_err(ParseRequestError::Io)?;
let gql_request: GQLRequest =
serde_json::from_slice(&data).map_err(ParseRequestError::InvalidRequest)?;
gql_request.into_query_builder().await
}
}
}
fn reader_stream(
mut reader: impl AsyncRead + Unpin + Send + 'static,
) -> impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static {
let mut buf = [0u8; 2048];
stream::poll_fn(move |cx| {
Poll::Ready(
match futures::ready!(Pin::new(&mut reader).poll_read(cx, &mut buf)?) {
0 => None,
size => Some(Ok(Bytes::copy_from_slice(&buf[..size]))),
},
)
})
}

View File

@ -1,22 +1,16 @@
//! A helper module that supports HTTP
mod graphiql_source;
mod into_query_builder;
mod multipart_stream;
mod playground_source;
mod stream_body;
use itertools::Itertools;
pub use graphiql_source::graphiql_source;
pub use multipart_stream::multipart_stream;
pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
pub use stream_body::StreamBody;
use crate::query::{IntoQueryBuilder, IntoQueryBuilderOpts};
use crate::{
Error, ParseRequestError, Pos, QueryBuilder, QueryError, QueryResponse, Result, Variables,
};
use crate::{Error, GQLQueryResponse, ParseRequestError, Pos, QueryError, Result, Variables};
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Deserialize, Serialize, Serializer};
@ -34,126 +28,6 @@ pub struct GQLRequest {
pub variables: Option<serde_json::Value>,
}
#[async_trait::async_trait]
impl IntoQueryBuilder for GQLRequest {
async fn into_query_builder_opts(
self,
_opts: &IntoQueryBuilderOpts,
) -> std::result::Result<QueryBuilder, ParseRequestError> {
let mut builder = QueryBuilder::new(self.query);
if let Some(operation_name) = self.operation_name {
builder = builder.operation_name(operation_name);
}
if let Some(variables) = self.variables {
builder = builder.variables(Variables::parse_from_json(variables));
}
Ok(builder)
}
}
/// Serializable GraphQL Response object
pub struct GQLResponse(pub Result<QueryResponse>);
impl Serialize for GQLResponse {
fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
match &self.0 {
Ok(res) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_key("data")?;
map.serialize_value(&res.data)?;
if res.extensions.is_some() {
map.serialize_key("extensions")?;
map.serialize_value(&res.extensions)?;
}
map.end()
}
Err(err) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_key("errors")?;
map.serialize_value(&GQLError(err))?;
map.end()
}
}
}
}
/// Serializable error type
pub struct GQLError<'a>(pub &'a Error);
impl<'a> Serialize for GQLError<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
match self.0 {
Error::Parse(err) => {
let mut seq = serializer.serialize_seq(Some(1))?;
seq.serialize_element(&serde_json::json! ({
"message": err.message,
"locations": [{"line": err.pos.line, "column": err.pos.column}]
}))?;
seq.end()
}
Error::Query { pos, path, err } => {
let mut seq = serializer.serialize_seq(Some(1))?;
if let QueryError::FieldError {
err,
extended_error,
} = err
{
let mut map = serde_json::Map::new();
map.insert("message".to_string(), err.to_string().into());
map.insert(
"locations".to_string(),
serde_json::json!([{"line": pos.line, "column": pos.column}]),
);
if let Some(path) = path {
map.insert("path".to_string(), path.clone());
}
if let Some(obj @ serde_json::Value::Object(_)) = extended_error {
map.insert("extensions".to_string(), obj.clone());
}
seq.serialize_element(&serde_json::Value::Object(map))?;
} else {
seq.serialize_element(&serde_json::json!({
"message": err.to_string(),
"locations": [{"line": pos.line, "column": pos.column}]
}))?;
}
seq.end()
}
Error::Rule { errors } => {
let mut seq = serializer.serialize_seq(Some(1))?;
for error in errors {
seq.serialize_element(&serde_json::json!({
"message": error.message,
"locations": error.locations.iter().map(|pos| serde_json::json!({"line": pos.line, "column": pos.column})).collect_vec(),
}))?;
}
seq.end()
}
}
}
}
struct GQLErrorPos<'a>(&'a Pos);
impl<'a> Serialize for GQLErrorPos<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("line", &self.0.line)?;
map.serialize_entry("column", &self.0.column)?;
map.end()
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -205,76 +79,4 @@ mod tests {
assert!(request.operation_name.is_none());
assert_eq!(request.query, "{ a b c }");
}
#[test]
fn test_response_data() {
let resp = GQLResponse(Ok(QueryResponse {
data: json!({"ok": true}),
extensions: None,
cache_control: Default::default(),
}));
assert_eq!(
serde_json::to_value(resp).unwrap(),
json! ({
"data": {
"ok": true,
}
})
);
}
#[test]
fn test_field_error_with_extension() {
let err = Error::Query {
pos: Pos {
line: 10,
column: 20,
},
path: None,
err: QueryError::FieldError {
err: "MyErrorMessage".to_owned(),
extended_error: Some(json!({
"code": "MY_TEST_CODE"
})),
},
};
let resp = GQLResponse(Err(err));
assert_eq!(
serde_json::to_value(resp).unwrap(),
json!({
"errors": [{
"message":"MyErrorMessage",
"extensions": {
"code": "MY_TEST_CODE"
},
"locations": [{"line": 10, "column": 20}]
}]
})
);
}
#[test]
fn test_response_error_with_pos() {
let resp = GQLResponse(Err(Error::Query {
pos: Pos {
line: 10,
column: 20,
},
path: None,
err: QueryError::NotSupported,
}));
assert_eq!(
serde_json::to_value(resp).unwrap(),
json!({
"errors": [{
"message":"Not supported.",
"locations": [
{"line": 10, "column": 20}
]
}]
})
);
}
}

View File

@ -1,20 +0,0 @@
use crate::http::GQLResponse;
use crate::{QueryResponse, Result};
use bytes::{buf::BufExt, Buf, Bytes};
use futures::{Stream, StreamExt};
/// Create a multipart response data stream.
pub fn multipart_stream(s: impl Stream<Item = Result<QueryResponse>>) -> impl Stream<Item = Bytes> {
s.map(|res| serde_json::to_vec(&GQLResponse(res)).unwrap())
.map(|data| {
Bytes::from(format!(
"\r\n---\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n",
data.len()
))
.chain(Bytes::from(data))
.to_bytes()
})
.chain(futures::stream::once(async move {
Bytes::from_static(b"\r\n-----\r\n")
}))
}

View File

@ -108,6 +108,7 @@ mod query;
mod resolver;
mod scalars;
mod schema;
mod serialize_resp;
mod subscription;
mod types;
mod validation;
@ -140,7 +141,7 @@ pub use error::{
};
pub use look_ahead::Lookahead;
pub use parser::{types::ConstValue as Value, Pos, Positioned};
pub use query::{IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse};
pub use query::{GQLQuery, GQLQueryResponse, ReceiveMultipartOptions};
pub use registry::CacheControl;
pub use scalars::{Any, Json, OutputJson, ID};
pub use schema::{Schema, SchemaBuilder, SchemaEnv};

View File

@ -1,21 +1,28 @@
use crate::context::{Data, ResolveId};
use crate::error::ParseRequestError;
use crate::extensions::{BoxExtension, ErrorLogger, Extension};
use crate::mutation_resolver::do_mutation_resolve;
use crate::parser::types::{OperationType, UploadValue};
use crate::registry::CacheControl;
use crate::{
do_resolve, ContextBase, Error, ObjectType, Pos, QueryEnv, QueryError, Result, Schema,
SubscriptionType, Value, Variables,
do_resolve, http, CacheControl, ContextBase, Error, ObjectType, ParseRequestError, Pos,
QueryEnv, QueryError, Result, Schema, SubscriptionType, Value, Variables,
};
use bytes::Bytes;
use futures::stream;
use futures::task::Poll;
use futures::{AsyncRead, AsyncReadExt, Stream};
use multer::{Constraints, Multipart, SizeLimit};
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Serialize, Serializer};
use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::{Seek, SeekFrom, Write};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
/// IntoQueryBuilder options
/// Options for `GQLQuery::receive_multipart`
#[derive(Default, Clone)]
pub struct IntoQueryBuilderOpts {
pub struct ReceiveMultipartOptions {
/// Maximum file size.
pub max_file_size: Option<usize>,
@ -23,56 +30,136 @@ pub struct IntoQueryBuilderOpts {
pub max_num_files: Option<usize>,
}
#[allow(missing_docs)]
#[async_trait::async_trait]
pub trait IntoQueryBuilder: Sized {
async fn into_query_builder(self) -> std::result::Result<QueryBuilder, ParseRequestError> {
self.into_query_builder_opts(&Default::default()).await
}
async fn into_query_builder_opts(
self,
opts: &IntoQueryBuilderOpts,
) -> std::result::Result<QueryBuilder, ParseRequestError>;
}
/// Query response
#[derive(Debug)]
pub struct QueryResponse {
/// Data of query result
pub data: serde_json::Value,
/// Extensions result
pub extensions: Option<serde_json::Value>,
/// Cache control value
pub cache_control: CacheControl,
}
/// Query builder
pub struct QueryBuilder {
pub(crate) query_source: String,
pub struct GQLQuery {
pub(crate) query: String,
pub(crate) operation_name: Option<String>,
pub(crate) variables: Variables,
pub(crate) ctx_data: Option<Data>,
extensions: Vec<Box<dyn Fn() -> BoxExtension + Send + Sync>>,
pub(crate) ctx_data: Data,
pub(crate) extensions: Vec<Box<dyn Fn() -> BoxExtension + Send + Sync>>,
}
impl QueryBuilder {
/// Create query builder with query source.
pub fn new<T: Into<String>>(query_source: T) -> QueryBuilder {
QueryBuilder {
query_source: query_source.into(),
impl GQLQuery {
/// Create a query with query source.
pub fn new(query: impl Into<String>) -> Self {
Self {
query: query.into(),
operation_name: None,
variables: Default::default(),
ctx_data: None,
extensions: Default::default(),
variables: Variables::default(),
ctx_data: Data::default(),
extensions: Vec::default(),
}
}
pub fn new_with_http_request(request: http::GQLRequest) -> Self {
Self {
query: request.query,
operation_name: request.operation_name,
variables: request
.variables
.map(|value| Variables::parse_from_json(value))
.unwrap_or_default(),
ctx_data: Data::default(),
extensions: Vec::default(),
}
}
pub async fn receive_request(
content_type: Option<impl AsRef<str>>,
mut body: impl AsyncRead,
opts: ReceiveMultipartOptions,
) -> std::result::Result<Self, ParseRequestError> {
if let Some(boundary) = content_type.and_then(|ct| multer::parse_boundary(ct).ok()) {
// multipart
let mut multipart = Multipart::new_with_constraints(
reader_stream(body),
boundary,
Constraints::new().size_limit({
let mut limit = SizeLimit::new();
if let (Some(max_file_size), Some(max_num_files)) =
(opts.max_file_size, opts.max_file_size)
{
limit = limit.whole_stream((max_file_size * max_num_files) as u64);
}
if let Some(max_file_size) = opts.max_file_size {
limit = limit.per_field(max_file_size as u64);
}
limit
}),
);
let mut query = None;
let mut map = None;
let mut files = Vec::new();
while let Some(mut field) = multipart.next_field().await? {
match field.name() {
Some("operations") => {
let request_str = field.text().await?;
query = Some(Self::new_with_http_request(
serde_json::from_str(&request_str)
.map_err(ParseRequestError::InvalidRequest)?,
));
}
Some("map") => {
let map_str = field.text().await?;
map = Some(
serde_json::from_str::<HashMap<String, Vec<String>>>(&map_str)
.map_err(ParseRequestError::InvalidFilesMap)?,
);
}
_ => {
if let Some(name) = field.name().map(ToString::to_string) {
if let Some(filename) = field.file_name().map(ToString::to_string) {
let content_type =
field.content_type().map(|mime| mime.to_string());
let mut file =
tempfile::tempfile().map_err(ParseRequestError::Io)?;
while let Some(chunk) = field.chunk().await.unwrap() {
file.write(&chunk).map_err(ParseRequestError::Io)?;
}
file.seek(SeekFrom::Start(0))?;
files.push((name, filename, content_type, file));
}
}
}
}
}
let mut query = query.ok_or(ParseRequestError::MissingOperatorsPart)?;
let map = map.as_mut().ok_or(ParseRequestError::MissingMapPart)?;
for (name, filename, content_type, file) in files {
if let Some(var_paths) = map.remove(&name) {
for var_path in var_paths {
query.set_upload(
&var_path,
filename.clone(),
content_type.clone(),
file.try_clone().unwrap(),
);
}
}
}
if !map.is_empty() {
return Err(ParseRequestError::MissingFiles);
}
Ok(query)
} else {
let mut data = Vec::new();
body.read_to_end(&mut data)
.await
.map_err(ParseRequestError::Io)?;
Ok(Self::new_with_http_request(
serde_json::from_slice(&data).map_err(ParseRequestError::InvalidRequest)?,
))
}
}
/// Specify the operation name.
pub fn operation_name<T: Into<String>>(self, name: T) -> Self {
QueryBuilder {
Self {
operation_name: Some(name.into()),
..self
}
@ -80,7 +167,7 @@ impl QueryBuilder {
/// Specify the variables.
pub fn variables(self, variables: Variables) -> Self {
QueryBuilder { variables, ..self }
Self { variables, ..self }
}
/// Add an extension
@ -97,13 +184,7 @@ impl QueryBuilder {
///
/// **This data is only valid for this query**
pub fn data<D: Any + Send + Sync>(mut self, data: D) -> Self {
if let Some(ctx_data) = &mut self.ctx_data {
ctx_data.insert(data);
} else {
let mut ctx_data = Data::default();
ctx_data.insert(data);
self.ctx_data = Some(ctx_data);
}
self.ctx_data.insert(data);
self
}
@ -125,84 +206,70 @@ impl QueryBuilder {
content,
});
}
}
/// Execute the query, always return a complete result.
pub async fn execute<Query, Mutation, Subscription>(
self,
schema: &Schema<Query, Mutation, Subscription>,
) -> Result<QueryResponse>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let (document, cache_control, extensions) =
schema.prepare_query(&self.query_source, &self.variables, &self.extensions)?;
// execute
let inc_resolve_id = AtomicUsize::default();
let document = match document.into_data(self.operation_name.as_deref()) {
Some(document) => document,
None => {
return if let Some(operation_name) = self.operation_name {
Err(Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::UnknownOperationNamed {
name: operation_name,
},
})
} else {
Err(Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::MissingOperation,
})
}
.log_error(&extensions)
}
};
let env = QueryEnv::new(
extensions,
self.variables,
document,
Arc::new(self.ctx_data.unwrap_or_default()),
);
let ctx = ContextBase {
path_node: None,
resolve_id: ResolveId::root(),
inc_resolve_id: &inc_resolve_id,
item: &env.document.operation.node.selection_set,
schema_env: &schema.env,
query_env: &env,
};
env.extensions.lock().execution_start();
let data = match &env.document.operation.node.ty {
OperationType::Query => do_resolve(&ctx, &schema.query).await?,
OperationType::Mutation => do_mutation_resolve(&ctx, &schema.mutation).await?,
OperationType::Subscription => {
return Err(Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::NotSupported,
})
}
};
env.extensions.lock().execution_end();
let resp = QueryResponse {
data,
extensions: env.extensions.lock().result(),
cache_control,
};
Ok(resp)
}
/// Get query source
#[inline]
pub fn query_source(&self) -> &str {
&self.query_source
impl<T: Into<String>> From<T> for GQLQuery {
fn from(query: T) -> Self {
Self::new(query)
}
}
impl From<http::GQLRequest> for GQLQuery {
fn from(request: http::GQLRequest) -> Self {
Self::new_with_http_request(request)
}
}
/// Query response
#[derive(Debug)]
pub struct GQLQueryResponse {
/// Data of query result
pub data: serde_json::Value,
/// Extensions result
pub extensions: Option<serde_json::Value>,
/// Cache control value
pub cache_control: CacheControl,
/// Error
pub error: Option<Error>,
}
impl GQLQueryResponse {
#[inline]
pub fn is_err(&self) -> bool {
self.error.is_some()
}
#[inline]
pub fn unwrap_err(self) -> Error {
self.error.unwrap()
}
}
impl From<Error> for GQLQueryResponse {
fn from(err: Error) -> Self {
Self {
data: serde_json::Value::Null,
extensions: None,
cache_control: CacheControl::default(),
error: Some(err),
}
}
}
fn reader_stream(
mut reader: impl AsyncRead + Unpin + Send + 'static,
) -> impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static {
let mut buf = [0u8; 2048];
stream::poll_fn(move |cx| {
Poll::Ready(
match futures::ready!(Pin::new(&mut reader).poll_read(cx, &mut buf)?) {
0 => None,
size => Some(Ok(Bytes::copy_from_slice(&buf[..size]))),
},
)
})
}

View File

@ -1,16 +1,16 @@
use crate::context::Data;
use crate::context::{Data, ResolveId};
use crate::extensions::{BoxExtension, ErrorLogger, Extension, Extensions};
use crate::model::__DirectiveLocation;
use crate::mutation_resolver::do_mutation_resolve;
use crate::parser::parse_query;
use crate::parser::types::{ExecutableDocument, OperationType};
use crate::query::QueryBuilder;
use crate::registry::{MetaDirective, MetaInputValue, Registry};
use crate::subscription::{create_connection, create_subscription_stream, ConnectionTransport};
use crate::types::QueryRoot;
use crate::validation::{check_rules, CheckResult, ValidationMode};
use crate::{
CacheControl, Error, ObjectType, Pos, QueryEnv, QueryError, QueryResponse, Result,
SubscriptionType, Type, Variables, ID,
do_resolve, CacheControl, ContextBase, Error, GQLQueryResponse, ObjectType, Pos, QueryEnv,
QueryError, Result, SubscriptionType, Type, Variables, ID,
};
use futures::channel::mpsc;
use futures::Stream;
@ -307,12 +307,7 @@ where
Self::build(query, mutation, subscription).finish()
}
/// Execute query without create the `QueryBuilder`.
pub async fn execute(&self, query_source: &str) -> Result<QueryResponse> {
QueryBuilder::new(query_source).execute(self).await
}
pub(crate) fn prepare_query(
fn prepare_query(
&self,
source: &str,
variables: &Variables,
@ -366,6 +361,74 @@ where
Ok((document, cache_control, extensions))
}
/// Execute query without create the `QueryBuilder`.
pub async fn execute(&self, query: GQLQuery) -> GQLQueryResponse {
let (document, cache_control, extensions) =
self.prepare_query(&query.query, &query.variables, &query.extensions)?;
// execute
let inc_resolve_id = AtomicUsize::default();
let document = match document.into_data(self.operation_name.as_deref()) {
Some(document) => document,
None => {
return if let Some(operation_name) = self.operation_name {
Err(Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::UnknownOperationNamed {
name: operation_name,
},
})
} else {
Err(Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::MissingOperation,
})
}
.log_error(&extensions)
.into()
}
};
let env = QueryEnv::new(
extensions,
self.variables,
document,
Arc::new(self.ctx_data.unwrap_or_default()),
);
let ctx = ContextBase {
path_node: None,
resolve_id: ResolveId::root(),
inc_resolve_id: &inc_resolve_id,
item: &env.document.operation.node.selection_set,
schema_env: &schema.env,
query_env: &env,
};
env.extensions.lock().execution_start();
let data = match &env.document.operation.node.ty {
OperationType::Query => do_resolve(&ctx, &schema.query).await?,
OperationType::Mutation => do_mutation_resolve(&ctx, &schema.mutation).await?,
OperationType::Subscription => {
return Err(Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::NotSupported,
})
.into()
}
};
env.extensions.lock().execution_end();
GQLQueryResponse {
data,
extensions: env.extensions.lock().result(),
cache_control,
error: None,
}
}
/// Create subscription stream, typically called inside the `SubscriptionTransport::handle_request` method
pub async fn create_subscription_stream(
&self,

0
src/serde.rs Normal file
View File

176
src/serialize_resp.rs Normal file
View File

@ -0,0 +1,176 @@
use crate::{Error, GQLQueryResponse, Pos, QueryError};
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Serialize, Serializer};
impl Serialize for GQLQueryResponse {
fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
match &self.error {
None => {
let mut map = serializer.serialize_map(None)?;
map.serialize_key("data")?;
map.serialize_value(&self.data)?;
if res.extensions.is_some() {
map.serialize_key("extensions")?;
map.serialize_value(&self.extensions)?;
}
map.end()
}
Some(err) => {
let mut map = serializer.serialize_map(None)?;
map.serialize_key("errors")?;
map.serialize_value(err)?;
map.end()
}
}
}
}
impl<'a> Serialize for Pos {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("line", &self.0.line)?;
map.serialize_entry("column", &self.0.column)?;
map.end()
}
}
impl<'a> Serialize for Error {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
Error::Parse(err) => {
let mut seq = serializer.serialize_seq(Some(1))?;
seq.serialize_element(&serde_json::json! ({
"message": err.message,
"locations": [{"line": err.pos.line, "column": err.pos.column}]
}))?;
seq.end()
}
Error::Query { pos, path, err } => {
let mut seq = serializer.serialize_seq(Some(1))?;
if let QueryError::FieldError {
err,
extended_error,
} = err
{
let mut map = serde_json::Map::new();
map.insert("message".to_string(), err.to_string().into());
map.insert(
"locations".to_string(),
serde_json::json!([{"line": pos.line, "column": pos.column}]),
);
if let Some(path) = path {
map.insert("path".to_string(), path.clone());
}
if let Some(obj @ serde_json::Value::Object(_)) = extended_error {
map.insert("extensions".to_string(), obj.clone());
}
seq.serialize_element(&serde_json::Value::Object(map))?;
} else {
seq.serialize_element(&serde_json::json!({
"message": err.to_string(),
"locations": [{"line": pos.line, "column": pos.column}]
}))?;
}
seq.end()
}
Error::Rule { errors } => {
let mut seq = serializer.serialize_seq(Some(1))?;
for error in errors {
seq.serialize_element(&serde_json::json!({
"message": error.message,
"locations": error.locations.iter().map(|pos| serde_json::json!({"line": pos.line, "column": pos.column})).collect_vec(),
}))?;
}
seq.end()
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Pos;
use serde_json::json;
#[test]
fn test_response_data() {
let resp = GQLQueryResponse {
data: json!({"ok": true}),
extensions: None,
cache_control: Default::default(),
error: None,
};
assert_eq!(
serde_json::to_value(resp).unwrap(),
json! ({
"data": {
"ok": true,
}
})
);
}
#[test]
fn test_field_error_with_extension() {
let resp = GQLQueryResponse::from(Error::Query {
pos: Pos {
line: 10,
column: 20,
},
path: None,
err: QueryError::FieldError {
err: "MyErrorMessage".to_owned(),
extended_error: Some(json!({
"code": "MY_TEST_CODE"
})),
},
});
assert_eq!(
serde_json::to_value(resp).unwrap(),
json!({
"errors": [{
"message":"MyErrorMessage",
"extensions": {
"code": "MY_TEST_CODE"
},
"locations": [{"line": 10, "column": 20}]
}]
})
);
}
#[test]
fn test_response_error_with_pos() {
let resp = GQLQueryResponse::from(Error::Query {
pos: Pos {
line: 10,
column: 20,
},
path: None,
err: QueryError::NotSupported,
});
assert_eq!(
serde_json::to_value(resp).unwrap(),
json!({
"errors": [{
"message":"Not supported.",
"locations": [
{"line": 10, "column": 20}
]
}]
})
);
}
}

View File

@ -1,8 +1,8 @@
use crate::context::Data;
use crate::http::{GQLError, GQLRequest, GQLResponse};
use crate::http::GQLRequest;
use crate::{
ConnectionTransport, Error, FieldError, FieldResult, ObjectType, QueryBuilder, QueryError,
QueryResponse, Result, Schema, SubscriptionStreams, SubscriptionType, Variables,
ConnectionTransport, Error, FieldError, FieldResult, GQLQueryResponse, ObjectType, QueryError,
Result, Schema, SubscriptionStreams, SubscriptionType, Variables,
};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
@ -201,7 +201,7 @@ impl ConnectionTransport for WebSocketTransport {
ty: "data".to_string(),
id: Some(id.clone()),
payload: Some(
serde_json::to_value(GQLResponse(Ok(QueryResponse {
serde_json::to_value(GQLResponse(Ok(GQLQueryResponse {
data: value,
extensions: None,
cache_control: Default::default(),