Continue to refactor.

This commit is contained in:
Sunli 2020-09-10 16:39:43 +08:00
parent ce0683e1f9
commit f3c0d86f12
21 changed files with 496 additions and 507 deletions

View File

@ -1,5 +1,4 @@
pub use async_graphql::http::GQLResponse;
use async_graphql::{GQLQueryResponse, ObjectType, Schema, SubscriptionType};
use async_graphql::{ObjectType, Response, Schema, SubscriptionType};
use async_graphql_parser::{parse_query, types::ExecutableDocument};
use async_std::task;
@ -10,7 +9,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
pub fn run<Query, Mutation, Subscription>(
s: &Schema<Query, Mutation, Subscription>,
q: &str,
) -> GQLQueryResponse
) -> Response
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
@ -30,7 +29,3 @@ pub fn parse(q: &str) -> ExecutableDocument {
// pub fn resolve() {
// do_resolve(...).unwrap();
// }
pub fn serialize(r: &GQLResponse) -> String {
serde_json::to_string(&r).unwrap()
}

View File

@ -84,7 +84,7 @@ pub fn generate(object_args: &args::Object, input: &DeriveInput) -> Result<Token
#[allow(clippy::all, clippy::pedantic)]
#[#crate_name::async_trait::async_trait]
impl #crate_name::SubscriptionType for #ident {
async fn create_field_stream(&self, idx: usize, ctx: &#crate_name::Context<'_>, schema_env: #crate_name::SchemaEnv, query_env: #crate_name::QueryEnv) -> #crate_name::Result<::std::pin::Pin<Box<dyn #crate_name::futures::Stream<Item = #crate_name::Result<#crate_name::serde_json::Value>> + Send>>> {
async fn create_field_stream(&self, idx: usize, ctx: &#crate_name::Context<'_>, schema_env: #crate_name::SchemaEnv, query_env: #crate_name::QueryEnv) -> #crate_name::Result<::std::pin::Pin<Box<dyn #crate_name::futures::Stream<Item = #crate_name::Response> + Send>>> {
#create_merged_obj.create_field_stream(idx, ctx, schema_env, query_env).await
}
}

View File

@ -284,10 +284,19 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
if !*state {
return #crate_name::futures::future::ready(None);
}
if item.is_err() {
let resp = match item {
Ok(value) => #crate_name::Response {
data: value,
extensions: None,
cache_control: Default::default(),
error: None,
}
Err(err) => err.into(),
};
if resp.is_err() {
*state = false;
}
return #crate_name::futures::future::ready(Some(item));
return #crate_name::futures::future::ready(Some(resp));
});
return Ok(Box::pin(stream));
}
@ -341,7 +350,7 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
ctx: &#crate_name::Context<'_>,
schema_env: #crate_name::SchemaEnv,
query_env: #crate_name::QueryEnv,
) -> #crate_name::Result<::std::pin::Pin<Box<dyn #crate_name::futures::Stream<Item = #crate_name::Result<#crate_name::serde_json::Value>> + Send>>> {
) -> #crate_name::Result<::std::pin::Pin<Box<dyn #crate_name::futures::Stream<Item = #crate_name::Response> + Send>>> {
#(#create_stream)*
Err(#crate_name::QueryError::FieldNotFound {
field_name: ctx.node.name.to_string(),

View File

@ -10,7 +10,7 @@ use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::StreamBody;
use async_graphql::{
GQLQueryResponse, IntoQueryBuilder, ParseRequestError, QueryBuilder, ReceiveMultipartOptions,
IntoQueryBuilder, ParseRequestError, QueryBuilder, ReceiveMultipartOptions, Response,
};
use futures::channel::mpsc;
use futures::future::Ready;
@ -87,10 +87,10 @@ impl FromRequest for GQLRequest {
}
/// Responder for GraphQL response
pub struct GQLResponse(async_graphql::Result<GQLQueryResponse>);
pub struct GQLResponse(async_graphql::Result<Response>);
impl From<async_graphql::Result<GQLQueryResponse>> for GQLResponse {
fn from(resp: async_graphql::Result<GQLQueryResponse>) -> Self {
impl From<async_graphql::Result<Response>> for GQLResponse {
fn from(resp: async_graphql::Result<Response>) -> Self {
GQLResponse(resp)
}
}
@ -109,11 +109,8 @@ impl Responder for GQLResponse {
}
}
fn add_cache_control(
builder: &mut HttpResponseBuilder,
resp: &async_graphql::Result<GQLQueryResponse>,
) {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp {
fn add_cache_control(builder: &mut HttpResponseBuilder, resp: &async_graphql::Result<Response>) {
if let Ok(Response { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() {
builder.header("cache-control", cache_control);
}

View File

@ -4,7 +4,7 @@
#![forbid(unsafe_code)]
use async_graphql::{
GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Schema,
IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Response, Schema,
SubscriptionType, Variables,
};
use log::{error, info};
@ -258,7 +258,7 @@ impl FromData for GQLRequest {
/// Wrapper around `async-graphql::query::QueryResponse` for implementing the trait
/// `rocket::response::responder::Responder`, so that `GQLResponse` can directly be returned
/// from a Rocket Route function.
pub struct GQLResponse(pub GQLQueryResponse);
pub struct GQLResponse(pub Response);
impl<'r> Responder<'r, 'static> for GQLResponse {
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
@ -277,13 +277,13 @@ impl<'r> Responder<'r, 'static> for GQLResponse {
/// Extension trait, to allow the use of `cache_control` with for example `ResponseBuilder`.
pub trait CacheControl {
/// Add the `async-graphql::query::QueryResponse` cache control value as header to the Rocket response.
fn cache_control(&mut self, resp: &async_graphql::Result<GQLQueryResponse>) -> &mut Self;
fn cache_control(&mut self, resp: &async_graphql::Result<Response>) -> &mut Self;
}
impl<'r> CacheControl for ResponseBuilder<'r> {
fn cache_control(
&mut self,
resp: &async_graphql::Result<GQLQueryResponse>,
resp: &async_graphql::Result<Response>,
) -> &mut ResponseBuilder<'r> {
match resp {
Ok(resp) if resp.cache_control.value().is_some() => self.header(Header::new(

View File

@ -7,7 +7,7 @@
use async_graphql::http::{GQLRequest, GQLResponse};
use async_graphql::{
GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Schema,
IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Response, Schema,
SubscriptionType,
};
use async_trait::async_trait;
@ -123,22 +123,19 @@ impl<State: Clone + Send + Sync + 'static> RequestExt<State> for Request<State>
///
pub trait ResponseExt: Sized {
/// Set body as the result of a GraphQL query.
fn body_graphql(self, res: async_graphql::Result<GQLQueryResponse>) -> tide::Result<Self>;
fn body_graphql(self, res: async_graphql::Result<Response>) -> tide::Result<Self>;
}
impl ResponseExt for Response {
fn body_graphql(self, res: async_graphql::Result<GQLQueryResponse>) -> tide::Result<Self> {
fn body_graphql(self, res: async_graphql::Result<Response>) -> tide::Result<Self> {
let mut resp = add_cache_control(self, &res);
resp.set_body(Body::from_json(&GQLResponse(res))?);
Ok(resp)
}
}
fn add_cache_control(
mut http_resp: Response,
resp: &async_graphql::Result<GQLQueryResponse>,
) -> Response {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp {
fn add_cache_control(mut http_resp: Response, resp: &async_graphql::Result<Response>) -> Response {
if let Ok(Response { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() {
if let Ok(header) = tide::http::headers::HeaderName::from_str("cache-control") {
http_resp.insert_header(header, cache_control);

View File

@ -7,8 +7,8 @@
use async_graphql::http::{GQLRequest, StreamBody};
use async_graphql::{
Data, FieldResult, GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder,
ReceiveMultipartOptions, Schema, SubscriptionType, WebSocketTransport,
Data, FieldResult, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions,
Response, Schema, SubscriptionType, WebSocketTransport,
};
use futures::select;
use futures::{SinkExt, StreamExt};
@ -231,16 +231,16 @@ where
}
/// GraphQL reply
pub struct GQLResponse(async_graphql::Result<GQLQueryResponse>);
pub struct GQLResponse(async_graphql::Result<Response>);
impl From<async_graphql::Result<GQLQueryResponse>> for GQLResponse {
fn from(resp: async_graphql::Result<GQLQueryResponse>) -> Self {
impl From<async_graphql::Result<Response>> for GQLResponse {
fn from(resp: async_graphql::Result<Response>) -> Self {
GQLResponse(resp)
}
}
fn add_cache_control(http_resp: &mut Response, resp: &async_graphql::Result<GQLQueryResponse>) {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp {
fn add_cache_control(http_resp: &mut Response, resp: &async_graphql::Result<Response>) {
if let Ok(Response { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() {
if let Ok(value) = cache_control.parse() {
http_resp.headers_mut().insert("cache-control", value);

View File

@ -1,7 +1,8 @@
use crate::Error;
use pest::iterators::Pair;
use pest::RuleType;
use serde::Serialize;
use serde::ser::SerializeMap;
use serde::{Serialize, Serializer};
use std::borrow::{Borrow, BorrowMut};
use std::cmp::Ordering;
use std::fmt;
@ -9,7 +10,7 @@ use std::hash::{Hash, Hasher};
use std::str::Chars;
/// Original position of an element in source code.
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Copy, Default, Hash, Serialize)]
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Copy, Default, Hash)]
pub struct Pos {
/// One-based line number.
pub line: usize,
@ -30,6 +31,18 @@ impl fmt::Display for Pos {
}
}
impl Serialize for Pos {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("line", &self.line)?;
map.serialize_entry("column", &self.column)?;
map.end()
}
}
/// An AST node that stores its original position.
#[derive(Debug, Clone, Copy, Default)]
pub struct Positioned<T: ?Sized> {

View File

@ -353,6 +353,17 @@ pub enum ParseRequestError {
PayloadTooLarge,
}
impl From<multer::Error> for ParseRequestError {
fn from(err: multer::Error) -> Self {
match err {
multer::Error::FieldSizeExceeded { .. } | multer::Error::StreamSizeExceeded { .. } => {
ParseRequestError::PayloadTooLarge
}
_ => ParseRequestError::InvalidMultipart(err),
}
}
}
/// Verification error.
#[derive(Debug, PartialEq)]
pub struct RuleError {

View File

@ -4,15 +4,11 @@ mod graphiql_source;
mod playground_source;
mod stream_body;
use itertools::Itertools;
pub use graphiql_source::graphiql_source;
pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
pub use stream_body::StreamBody;
use crate::{Error, GQLQueryResponse, ParseRequestError, Pos, QueryError, Result, Variables};
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Deserialize, Serialize, Serializer};
use serde::Deserialize;
/// Deserializable GraphQL Request object
#[derive(Deserialize, Clone, PartialEq, Debug)]
@ -31,7 +27,6 @@ pub struct GQLRequest {
#[cfg(test)]
mod tests {
use super::*;
use crate::Pos;
use serde_json::json;
#[test]

View File

@ -98,14 +98,20 @@
#![recursion_limit = "256"]
#![forbid(unsafe_code)]
// Do not try to modify the location of this line of code, it must be
// the first mod defined, otherwise there will be a danger of failing to compile.
#[macro_use]
mod macros;
mod base;
mod context;
mod error;
mod look_ahead;
mod model;
mod mutation_resolver;
mod query;
mod request;
mod resolver;
mod response;
mod scalars;
mod schema;
mod serialize_resp;
@ -141,14 +147,13 @@ pub use error::{
};
pub use look_ahead::Lookahead;
pub use parser::{types::ConstValue as Value, Pos, Positioned};
pub use query::{GQLQuery, GQLQueryResponse, ReceiveMultipartOptions};
pub use registry::CacheControl;
pub use request::{ReceiveMultipartOptions, Request};
pub use response::Response;
pub use scalars::{Any, Json, OutputJson, ID};
pub use schema::{Schema, SchemaBuilder, SchemaEnv};
pub use serde_json::Number;
pub use subscription::{
ConnectionTransport, SimpleBroker, SubscriptionStreams, WebSocketTransport,
};
pub use subscription::{ConnectionTransport, SimpleBroker, SubscriptionStreams};
pub use types::{
connection, EmptyMutation, EmptySubscription, MaybeUndefined, MergedObject,
MergedObjectSubscriptionTail, MergedObjectTail, Upload,
@ -164,7 +169,6 @@ pub use context::ContextSelectionSet;
#[doc(hidden)]
pub mod registry;
#[doc(hidden)]
pub use base::{BoxFieldFuture, InputObjectType, InputValueType, ObjectType, OutputValueType};
#[doc(hidden)]

8
src/macros.rs Normal file
View File

@ -0,0 +1,8 @@
macro_rules! try_query_result {
($res:expr) => {
match $res {
Ok(resp) => resp,
Err(err) => return err.into(),
}
};
}

View File

@ -1,24 +1,15 @@
use crate::context::{Data, ResolveId};
use crate::extensions::{BoxExtension, ErrorLogger, Extension};
use crate::parser::types::{OperationType, UploadValue};
use crate::{
do_resolve, http, CacheControl, ContextBase, Error, ObjectType, ParseRequestError, Pos,
QueryEnv, QueryError, Result, Schema, SubscriptionType, Value, Variables,
};
use crate::parser::types::UploadValue;
use crate::{http, Data, ParseRequestError, Value, Variables};
use bytes::Bytes;
use futures::stream;
use futures::task::Poll;
use futures::{AsyncRead, AsyncReadExt, Stream};
use multer::{Constraints, Multipart, SizeLimit};
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Serialize, Serializer};
use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::{Seek, SeekFrom, Write};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::io::{self, Seek, SeekFrom, Write};
use std::pin::Pin;
/// Options for `GQLQuery::receive_multipart`
#[derive(Default, Clone)]
@ -30,23 +21,21 @@ pub struct ReceiveMultipartOptions {
pub max_num_files: Option<usize>,
}
pub struct GQLQuery {
pub struct Request {
pub(crate) query: String,
pub(crate) operation_name: Option<String>,
pub(crate) variables: Variables,
pub(crate) ctx_data: Data,
pub(crate) extensions: Vec<Box<dyn Fn() -> BoxExtension + Send + Sync>>,
}
impl GQLQuery {
/// Create a query with query source.
impl Request {
/// Create a request object with query source.
pub fn new(query: impl Into<String>) -> Self {
Self {
query: query.into(),
operation_name: None,
variables: Variables::default(),
ctx_data: Data::default(),
extensions: Vec::default(),
}
}
@ -59,13 +48,12 @@ impl GQLQuery {
.map(|value| Variables::parse_from_json(value))
.unwrap_or_default(),
ctx_data: Data::default(),
extensions: Vec::default(),
}
}
pub async fn receive_request(
content_type: Option<impl AsRef<str>>,
mut body: impl AsyncRead,
mut body: impl AsyncRead + Send + Unpin + 'static,
opts: ReceiveMultipartOptions,
) -> std::result::Result<Self, ParseRequestError> {
if let Some(boundary) = content_type.and_then(|ct| multer::parse_boundary(ct).ok()) {
@ -87,7 +75,7 @@ impl GQLQuery {
}),
);
let mut query = None;
let mut request = None;
let mut map = None;
let mut files = Vec::new();
@ -95,7 +83,7 @@ impl GQLQuery {
match field.name() {
Some("operations") => {
let request_str = field.text().await?;
query = Some(Self::new_with_http_request(
request = Some(Self::new_with_http_request(
serde_json::from_str(&request_str)
.map_err(ParseRequestError::InvalidRequest)?,
));
@ -125,13 +113,13 @@ impl GQLQuery {
}
}
let mut query = query.ok_or(ParseRequestError::MissingOperatorsPart)?;
let mut request = request.ok_or(ParseRequestError::MissingOperatorsPart)?;
let map = map.as_mut().ok_or(ParseRequestError::MissingMapPart)?;
for (name, filename, content_type, file) in files {
if let Some(var_paths) = map.remove(&name) {
for var_path in var_paths {
query.set_upload(
request.set_upload(
&var_path,
filename.clone(),
content_type.clone(),
@ -145,7 +133,7 @@ impl GQLQuery {
return Err(ParseRequestError::MissingFiles);
}
Ok(query)
Ok(request)
} else {
let mut data = Vec::new();
body.read_to_end(&mut data)
@ -170,16 +158,6 @@ impl GQLQuery {
Self { variables, ..self }
}
/// Add an extension
pub fn extension<F: Fn() -> E + Send + Sync + 'static, E: Extension>(
mut self,
extension_factory: F,
) -> Self {
self.extensions
.push(Box::new(move || Box::new(extension_factory())));
self
}
/// Add a context data that can be accessed in the `Context`, you access it with `Context::data`.
///
/// **This data is only valid for this query**
@ -208,57 +186,18 @@ impl GQLQuery {
}
}
impl<T: Into<String>> From<T> for GQLQuery {
impl<T: Into<String>> From<T> for Request {
fn from(query: T) -> Self {
Self::new(query)
}
}
impl From<http::GQLRequest> for GQLQuery {
impl From<http::GQLRequest> for Request {
fn from(request: http::GQLRequest) -> Self {
Self::new_with_http_request(request)
}
}
/// Query response
#[derive(Debug)]
pub struct GQLQueryResponse {
/// Data of query result
pub data: serde_json::Value,
/// Extensions result
pub extensions: Option<serde_json::Value>,
/// Cache control value
pub cache_control: CacheControl,
/// Error
pub error: Option<Error>,
}
impl GQLQueryResponse {
#[inline]
pub fn is_err(&self) -> bool {
self.error.is_some()
}
#[inline]
pub fn unwrap_err(self) -> Error {
self.error.unwrap()
}
}
impl From<Error> for GQLQueryResponse {
fn from(err: Error) -> Self {
Self {
data: serde_json::Value::Null,
extensions: None,
cache_control: CacheControl::default(),
error: Some(err),
}
}
}
fn reader_stream(
mut reader: impl AsyncRead + Unpin + Send + 'static,
) -> impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static {

40
src/response.rs Normal file
View File

@ -0,0 +1,40 @@
use crate::{CacheControl, Error};
/// Query response
#[derive(Debug)]
pub struct Response {
/// Data of query result
pub data: serde_json::Value,
/// Extensions result
pub extensions: Option<serde_json::Value>,
/// Cache control value
pub cache_control: CacheControl,
/// Error
pub error: Option<Error>,
}
impl Response {
#[inline]
pub fn is_err(&self) -> bool {
self.error.is_some()
}
#[inline]
pub fn unwrap_err(self) -> Error {
self.error.unwrap()
}
}
impl From<Error> for Response {
fn from(err: Error) -> Self {
Self {
data: serde_json::Value::Null,
extensions: None,
cache_control: CacheControl::default(),
error: Some(err),
}
}
}

View File

@ -5,14 +5,13 @@ use crate::mutation_resolver::do_mutation_resolve;
use crate::parser::parse_query;
use crate::parser::types::{ExecutableDocument, OperationType};
use crate::registry::{MetaDirective, MetaInputValue, Registry};
use crate::subscription::{create_connection, create_subscription_stream, ConnectionTransport};
use crate::subscription::create_subscription_stream;
use crate::types::QueryRoot;
use crate::validation::{check_rules, CheckResult, ValidationMode};
use crate::{
do_resolve, CacheControl, ContextBase, Error, GQLQueryResponse, ObjectType, Pos, QueryEnv,
QueryError, Result, SubscriptionType, Type, Variables, ID,
do_resolve, CacheControl, ContextBase, Error, ObjectType, Pos, QueryEnv, QueryError, Request,
Response, Result, SubscriptionType, Type, Variables, ID,
};
use futures::channel::mpsc;
use futures::Stream;
use indexmap::map::IndexMap;
use itertools::Itertools;
@ -311,14 +310,12 @@ where
&self,
source: &str,
variables: &Variables,
query_extensions: &[Box<dyn Fn() -> BoxExtension + Send + Sync>],
) -> Result<(ExecutableDocument, CacheControl, spin::Mutex<Extensions>)> {
// create extension instances
let extensions = spin::Mutex::new(Extensions(
self.0
.extensions
.iter()
.chain(query_extensions)
.map(|factory| factory())
.collect_vec(),
));
@ -362,108 +359,110 @@ where
}
/// Execute query without create the `QueryBuilder`.
pub async fn execute(&self, query: GQLQuery) -> GQLQueryResponse {
pub async fn execute(&self, query: Request) -> Response {
let (document, cache_control, extensions) =
self.prepare_query(&query.query, &query.variables, &query.extensions)?;
try_query_result!(self.prepare_query(&query.query, &query.variables));
// execute
let inc_resolve_id = AtomicUsize::default();
let document = match document.into_data(self.operation_name.as_deref()) {
let document = match document.into_data(query.operation_name.as_deref()) {
Some(document) => document,
None => {
return if let Some(operation_name) = self.operation_name {
Err(Error::Query {
let err = if let Some(operation_name) = query.operation_name {
Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::UnknownOperationNamed {
name: operation_name,
},
})
}
} else {
Err(Error::Query {
Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::MissingOperation,
})
}
.log_error(&extensions)
.into()
}
};
extensions.lock().error(&err);
return err.into();
}
};
let env = QueryEnv::new(
extensions,
self.variables,
query.variables,
document,
Arc::new(self.ctx_data.unwrap_or_default()),
Arc::new(query.ctx_data),
);
let ctx = ContextBase {
path_node: None,
resolve_id: ResolveId::root(),
inc_resolve_id: &inc_resolve_id,
item: &env.document.operation.node.selection_set,
schema_env: &schema.env,
schema_env: &self.env,
query_env: &env,
};
env.extensions.lock().execution_start();
let data = match &env.document.operation.node.ty {
OperationType::Query => do_resolve(&ctx, &schema.query).await?,
OperationType::Mutation => do_mutation_resolve(&ctx, &schema.mutation).await?,
OperationType::Query => try_query_result!(do_resolve(&ctx, &self.query).await),
OperationType::Mutation => {
try_query_result!(do_mutation_resolve(&ctx, &self.mutation).await)
}
OperationType::Subscription => {
return Err(Error::Query {
return Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::NotSupported,
})
}
.into()
}
};
env.extensions.lock().execution_end();
GQLQueryResponse {
let extensions = env.extensions.lock().result();
Response {
data,
extensions: env.extensions.lock().result(),
extensions,
cache_control,
error: None,
}
}
/// Create subscription stream, typically called inside the `SubscriptionTransport::handle_request` method
pub async fn create_subscription_stream(
pub async fn execute_stream(
&self,
source: &str,
operation_name: Option<&str>,
variables: Variables,
ctx_data: Option<Arc<Data>>,
) -> Result<impl Stream<Item = Result<serde_json::Value>> + Send> {
let (document, _, extensions) = self.prepare_query(source, &variables, &Vec::new())?;
query: Request,
) -> Result<impl Stream<Item = Response> + Send> {
let (document, _, extensions) = self.prepare_query(&query.query, &query.variables)?;
let document = match document.into_data(operation_name) {
let document = match document.into_data(query.operation_name.as_deref()) {
Some(document) => document,
None => {
return if let Some(name) = operation_name {
Err(QueryError::UnknownOperationNamed {
let err = if let Some(name) = query.operation_name {
QueryError::UnknownOperationNamed {
name: name.to_string(),
}
.into_error(Pos::default()))
.into_error(Pos::default())
} else {
Err(QueryError::MissingOperation.into_error(Pos::default()))
}
.log_error(&extensions)
QueryError::MissingOperation.into_error(Pos::default())
};
extensions.lock().error(&err);
return Err(err.into());
}
};
if document.operation.node.ty != OperationType::Subscription {
return Err(QueryError::NotSupported.into_error(Pos::default())).log_error(&extensions);
let err = QueryError::NotSupported.into_error(Pos::default());
extensions.lock().error(&err);
return Err(err);
}
let resolve_id = AtomicUsize::default();
let env = QueryEnv::new(
extensions,
variables,
query.variables,
document,
ctx_data.unwrap_or_default(),
Arc::new(query.ctx_data),
);
let ctx = env.create_context(
&self.env,
@ -472,20 +471,12 @@ where
&resolve_id,
);
let mut streams = Vec::new();
create_subscription_stream(self, env.clone(), &ctx, &mut streams)
.await
.log_error(&ctx.query_env.extensions)?;
Ok(futures::stream::select_all(streams))
}
/// Create subscription connection, returns `Sink` and `Stream`.
pub fn subscription_connection<T: ConnectionTransport>(
&self,
transport: T,
) -> (
mpsc::UnboundedSender<Vec<u8>>,
impl Stream<Item = Vec<u8>> + Unpin,
) {
create_connection(self.clone(), transport)
match create_subscription_stream(self, env.clone(), &ctx, &mut streams).await {
Ok(()) => Ok(futures::stream::select_all(streams)),
Err(err) => {
env.extensions.lock().error(&err);
Err(err)
}
}
}
}

View File

@ -1,15 +1,16 @@
use crate::{Error, GQLQueryResponse, Pos, QueryError};
use crate::{Error, QueryError, Response};
use itertools::Itertools;
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Serialize, Serializer};
impl Serialize for GQLQueryResponse {
impl Serialize for Response {
fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
match &self.error {
None => {
let mut map = serializer.serialize_map(None)?;
map.serialize_key("data")?;
map.serialize_value(&self.data)?;
if res.extensions.is_some() {
if self.extensions.is_some() {
map.serialize_key("extensions")?;
map.serialize_value(&self.extensions)?;
}
@ -25,18 +26,6 @@ impl Serialize for GQLQueryResponse {
}
}
impl<'a> Serialize for Pos {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("line", &self.0.line)?;
map.serialize_entry("column", &self.0.column)?;
map.end()
}
}
impl<'a> Serialize for Error {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
@ -105,7 +94,7 @@ mod tests {
#[test]
fn test_response_data() {
let resp = GQLQueryResponse {
let resp = Response {
data: json!({"ok": true}),
extensions: None,
cache_control: Default::default(),
@ -123,7 +112,7 @@ mod tests {
#[test]
fn test_field_error_with_extension() {
let resp = GQLQueryResponse::from(Error::Query {
let resp = Response::from(Error::Query {
pos: Pos {
line: 10,
column: 20,
@ -153,7 +142,7 @@ mod tests {
#[test]
fn test_response_error_with_pos() {
let resp = GQLQueryResponse::from(Error::Query {
let resp = Response::from(Error::Query {
pos: Pos {
line: 10,
column: 20,

View File

@ -1,9 +1,8 @@
mod connection;
mod simple_broker;
mod subscription_type;
mod ws_transport;
// mod ws_transport;
pub use connection::{create_connection, ConnectionTransport, SubscriptionStreams};
pub use simple_broker::SimpleBroker;
pub use subscription_type::{create_subscription_stream, SubscriptionType};
pub use ws_transport::WebSocketTransport;

View File

@ -1,6 +1,6 @@
use crate::context::QueryEnv;
use crate::parser::types::{Selection, TypeCondition};
use crate::{Context, ContextSelectionSet, ObjectType, Result, Schema, SchemaEnv, Type};
use crate::{Context, ContextSelectionSet, ObjectType, Response, Result, Schema, SchemaEnv, Type};
use futures::{Future, Stream};
use std::pin::Pin;
@ -20,7 +20,7 @@ pub trait SubscriptionType: Type {
ctx: &Context<'_>,
schema_env: SchemaEnv,
query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>>;
) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>>;
}
type BoxCreateStreamFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
@ -29,7 +29,7 @@ pub fn create_subscription_stream<'a, Query, Mutation, Subscription>(
schema: &'a Schema<Query, Mutation, Subscription>,
environment: QueryEnv,
ctx: &'a ContextSelectionSet<'_>,
streams: &'a mut Vec<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>>,
streams: &'a mut Vec<Pin<Box<dyn Stream<Item = Response> + Send>>>,
) -> BoxCreateStreamFuture<'a>
where
Query: ObjectType + Send + Sync + 'static,
@ -109,7 +109,7 @@ impl<T: SubscriptionType + Send + Sync> SubscriptionType for &T {
ctx: &Context<'_>,
schema_env: SchemaEnv,
query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>> {
) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>> {
T::create_field_stream(*self, idx, ctx, schema_env, query_env).await
}
}

View File

@ -1,5 +1,7 @@
use crate::context::QueryEnv;
use crate::{registry, Context, Error, Pos, QueryError, Result, SchemaEnv, SubscriptionType, Type};
use crate::{
registry, Context, Error, Pos, QueryError, Response, Result, SchemaEnv, SubscriptionType, Type,
};
use futures::Stream;
use std::borrow::Cow;
use std::pin::Pin;
@ -39,7 +41,7 @@ impl SubscriptionType for EmptySubscription {
_ctx: &Context<'_>,
_schema_env: SchemaEnv,
_query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>>
) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>>
where
Self: Send + Sync + 'static + Sized,
{

View File

@ -2,7 +2,7 @@ use crate::parser::types::Field;
use crate::registry::{MetaType, Registry};
use crate::{
do_resolve, CacheControl, Context, ContextSelectionSet, Error, ObjectType, OutputValueType,
Positioned, QueryEnv, QueryError, Result, SchemaEnv, SubscriptionType, Type,
Positioned, QueryEnv, QueryError, Response, Result, SchemaEnv, SubscriptionType, Type,
};
use futures::Stream;
use indexmap::IndexMap;
@ -111,7 +111,7 @@ where
ctx: &Context<'_>,
schema_env: SchemaEnv,
query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>> {
) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>> {
match self
.0
.create_field_stream(idx, ctx, schema_env.clone(), query_env.clone())

View File

@ -1,291 +1,291 @@
use async_graphql::*;
use futures::{SinkExt, Stream, StreamExt};
#[async_std::test]
pub async fn test_subscription_ws_transport() {
struct QueryRoot;
#[Object]
impl QueryRoot {}
struct SubscriptionRoot;
#[Subscription]
impl SubscriptionRoot {
async fn values(&self) -> impl Stream<Item = i32> {
futures::stream::iter(0..10)
}
}
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default());
sink.send(
serde_json::to_vec(&serde_json::json!({
"type": "connection_init",
"payload": { "token": "123456" }
}))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
Some(serde_json::json!({
"type": "connection_ack",
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
sink.send(
serde_json::to_vec(&serde_json::json!({
"type": "start",
"id": "1",
"payload": {
"query": "subscription { values }"
},
}))
.unwrap(),
)
.await
.unwrap();
for i in 0..10 {
assert_eq!(
Some(serde_json::json!({
"type": "data",
"id": "1",
"payload": { "data": { "values": i } },
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
}
}
#[async_std::test]
pub async fn test_subscription_ws_transport_with_token() {
struct Token(String);
struct QueryRoot;
#[Object]
impl QueryRoot {}
struct SubscriptionRoot;
#[Subscription]
impl SubscriptionRoot {
async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> {
if ctx.data_unchecked::<Token>().0 != "123456" {
return Err("forbidden".into());
}
Ok(futures::stream::iter(0..10))
}
}
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::new(|value| {
#[derive(serde::Deserialize)]
struct Payload {
token: String,
}
let payload: Payload = serde_json::from_value(value).unwrap();
let mut data = Data::default();
data.insert(Token(payload.token));
Ok(data)
}));
sink.send(
serde_json::to_vec(&serde_json::json!({
"type": "connection_init",
"payload": { "token": "123456" }
}))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
Some(serde_json::json!({
"type": "connection_ack",
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
sink.send(
serde_json::to_vec(&serde_json::json!({
"type": "start",
"id": "1",
"payload": {
"query": "subscription { values }"
},
}))
.unwrap(),
)
.await
.unwrap();
for i in 0..10 {
assert_eq!(
Some(serde_json::json!({
"type": "data",
"id": "1",
"payload": { "data": { "values": i } },
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
}
}
#[async_std::test]
pub async fn test_subscription_ws_transport_error() {
struct QueryRoot;
struct Event {
value: i32,
}
#[Object]
impl Event {
async fn value(&self) -> FieldResult<i32> {
if self.value < 5 {
Ok(self.value)
} else {
Err("TestError".into())
}
}
}
#[Object]
impl QueryRoot {}
struct SubscriptionRoot;
#[Subscription]
impl SubscriptionRoot {
async fn events(&self) -> impl Stream<Item = Event> {
futures::stream::iter((0..10).map(|n| Event { value: n }))
}
}
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let (mut sink, mut stream) =
schema.subscription_connection(WebSocketTransport::new(|_| Ok(Data::default())));
sink.send(
serde_json::to_vec(&serde_json::json!({
"type": "connection_init"
}))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
Some(serde_json::json!({
"type": "connection_ack",
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
sink.send(
serde_json::to_vec(&serde_json::json!({
"type": "start",
"id": "1",
"payload": {
"query": "subscription { events { value } }"
},
}))
.unwrap(),
)
.await
.unwrap();
for i in 0i32..5 {
assert_eq!(
Some(serde_json::json!({
"type": "data",
"id": "1",
"payload": { "data": { "events": { "value": i } } },
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
}
assert_eq!(
Some(serde_json::json!({
"type": "error",
"id": "1",
"payload": [{
"message": "TestError",
"locations": [{"line": 1, "column": 25}],
"path": ["events", "value"],
}],
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
}
#[async_std::test]
pub async fn test_query_over_websocket() {
struct QueryRoot;
#[Object]
impl QueryRoot {
async fn value(&self) -> i32 {
999
}
}
let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default());
sink.send(
serde_json::to_vec(&serde_json::json!({
"type": "connection_init",
}))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
Some(serde_json::json!({
"type": "connection_ack",
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
sink.send(
serde_json::to_vec(&serde_json::json!({
"type": "start",
"id": "1",
"payload": {
"query": "query { value }"
},
}))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
Some(serde_json::json!({
"type": "data",
"id": "1",
"payload": { "data": { "value": 999 } },
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
assert_eq!(
Some(serde_json::json!({
"type": "complete",
"id": "1",
})),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
);
}
// use async_graphql::*;
// use futures::{SinkExt, Stream, StreamExt};
//
// #[async_std::test]
// pub async fn test_subscription_ws_transport() {
// struct QueryRoot;
//
// #[Object]
// impl QueryRoot {}
//
// struct SubscriptionRoot;
//
// #[Subscription]
// impl SubscriptionRoot {
// async fn values(&self) -> impl Stream<Item = i32> {
// futures::stream::iter(0..10)
// }
// }
//
// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
// let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default());
//
// sink.send(
// serde_json::to_vec(&serde_json::json!({
// "type": "connection_init",
// "payload": { "token": "123456" }
// }))
// .unwrap(),
// )
// .await
// .unwrap();
//
// assert_eq!(
// Some(serde_json::json!({
// "type": "connection_ack",
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
//
// sink.send(
// serde_json::to_vec(&serde_json::json!({
// "type": "start",
// "id": "1",
// "payload": {
// "query": "subscription { values }"
// },
// }))
// .unwrap(),
// )
// .await
// .unwrap();
//
// for i in 0..10 {
// assert_eq!(
// Some(serde_json::json!({
// "type": "data",
// "id": "1",
// "payload": { "data": { "values": i } },
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
// }
// }
//
// #[async_std::test]
// pub async fn test_subscription_ws_transport_with_token() {
// struct Token(String);
//
// struct QueryRoot;
//
// #[Object]
// impl QueryRoot {}
//
// struct SubscriptionRoot;
//
// #[Subscription]
// impl SubscriptionRoot {
// async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> {
// if ctx.data_unchecked::<Token>().0 != "123456" {
// return Err("forbidden".into());
// }
// Ok(futures::stream::iter(0..10))
// }
// }
//
// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
//
// let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::new(|value| {
// #[derive(serde::Deserialize)]
// struct Payload {
// token: String,
// }
//
// let payload: Payload = serde_json::from_value(value).unwrap();
// let mut data = Data::default();
// data.insert(Token(payload.token));
// Ok(data)
// }));
//
// sink.send(
// serde_json::to_vec(&serde_json::json!({
// "type": "connection_init",
// "payload": { "token": "123456" }
// }))
// .unwrap(),
// )
// .await
// .unwrap();
//
// assert_eq!(
// Some(serde_json::json!({
// "type": "connection_ack",
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
//
// sink.send(
// serde_json::to_vec(&serde_json::json!({
// "type": "start",
// "id": "1",
// "payload": {
// "query": "subscription { values }"
// },
// }))
// .unwrap(),
// )
// .await
// .unwrap();
//
// for i in 0..10 {
// assert_eq!(
// Some(serde_json::json!({
// "type": "data",
// "id": "1",
// "payload": { "data": { "values": i } },
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
// }
// }
//
// #[async_std::test]
// pub async fn test_subscription_ws_transport_error() {
// struct QueryRoot;
//
// struct Event {
// value: i32,
// }
//
// #[Object]
// impl Event {
// async fn value(&self) -> FieldResult<i32> {
// if self.value < 5 {
// Ok(self.value)
// } else {
// Err("TestError".into())
// }
// }
// }
//
// #[Object]
// impl QueryRoot {}
//
// struct SubscriptionRoot;
//
// #[Subscription]
// impl SubscriptionRoot {
// async fn events(&self) -> impl Stream<Item = Event> {
// futures::stream::iter((0..10).map(|n| Event { value: n }))
// }
// }
//
// let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
//
// let (mut sink, mut stream) =
// schema.subscription_connection(WebSocketTransport::new(|_| Ok(Data::default())));
//
// sink.send(
// serde_json::to_vec(&serde_json::json!({
// "type": "connection_init"
// }))
// .unwrap(),
// )
// .await
// .unwrap();
//
// assert_eq!(
// Some(serde_json::json!({
// "type": "connection_ack",
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
//
// sink.send(
// serde_json::to_vec(&serde_json::json!({
// "type": "start",
// "id": "1",
// "payload": {
// "query": "subscription { events { value } }"
// },
// }))
// .unwrap(),
// )
// .await
// .unwrap();
//
// for i in 0i32..5 {
// assert_eq!(
// Some(serde_json::json!({
// "type": "data",
// "id": "1",
// "payload": { "data": { "events": { "value": i } } },
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
// }
//
// assert_eq!(
// Some(serde_json::json!({
// "type": "error",
// "id": "1",
// "payload": [{
// "message": "TestError",
// "locations": [{"line": 1, "column": 25}],
// "path": ["events", "value"],
// }],
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
// }
//
// #[async_std::test]
// pub async fn test_query_over_websocket() {
// struct QueryRoot;
//
// #[Object]
// impl QueryRoot {
// async fn value(&self) -> i32 {
// 999
// }
// }
//
// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
// let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default());
//
// sink.send(
// serde_json::to_vec(&serde_json::json!({
// "type": "connection_init",
// }))
// .unwrap(),
// )
// .await
// .unwrap();
//
// assert_eq!(
// Some(serde_json::json!({
// "type": "connection_ack",
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
//
// sink.send(
// serde_json::to_vec(&serde_json::json!({
// "type": "start",
// "id": "1",
// "payload": {
// "query": "query { value }"
// },
// }))
// .unwrap(),
// )
// .await
// .unwrap();
//
// assert_eq!(
// Some(serde_json::json!({
// "type": "data",
// "id": "1",
// "payload": { "data": { "value": 999 } },
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
//
// assert_eq!(
// Some(serde_json::json!({
// "type": "complete",
// "id": "1",
// })),
// serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
// );
// }