Continue to refactor.

This commit is contained in:
Sunli 2020-09-10 16:39:43 +08:00
parent 9bc837da9e
commit 7a90069cad
21 changed files with 496 additions and 507 deletions

View File

@ -1,5 +1,4 @@
pub use async_graphql::http::GQLResponse; use async_graphql::{ObjectType, Response, Schema, SubscriptionType};
use async_graphql::{GQLQueryResponse, ObjectType, Schema, SubscriptionType};
use async_graphql_parser::{parse_query, types::ExecutableDocument}; use async_graphql_parser::{parse_query, types::ExecutableDocument};
use async_std::task; use async_std::task;
@ -10,7 +9,7 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
pub fn run<Query, Mutation, Subscription>( pub fn run<Query, Mutation, Subscription>(
s: &Schema<Query, Mutation, Subscription>, s: &Schema<Query, Mutation, Subscription>,
q: &str, q: &str,
) -> GQLQueryResponse ) -> Response
where where
Query: ObjectType + Send + Sync + 'static, Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static,
@ -30,7 +29,3 @@ pub fn parse(q: &str) -> ExecutableDocument {
// pub fn resolve() { // pub fn resolve() {
// do_resolve(...).unwrap(); // do_resolve(...).unwrap();
// } // }
pub fn serialize(r: &GQLResponse) -> String {
serde_json::to_string(&r).unwrap()
}

View File

@ -84,7 +84,7 @@ pub fn generate(object_args: &args::Object, input: &DeriveInput) -> Result<Token
#[allow(clippy::all, clippy::pedantic)] #[allow(clippy::all, clippy::pedantic)]
#[#crate_name::async_trait::async_trait] #[#crate_name::async_trait::async_trait]
impl #crate_name::SubscriptionType for #ident { impl #crate_name::SubscriptionType for #ident {
async fn create_field_stream(&self, idx: usize, ctx: &#crate_name::Context<'_>, schema_env: #crate_name::SchemaEnv, query_env: #crate_name::QueryEnv) -> #crate_name::Result<::std::pin::Pin<Box<dyn #crate_name::futures::Stream<Item = #crate_name::Result<#crate_name::serde_json::Value>> + Send>>> { async fn create_field_stream(&self, idx: usize, ctx: &#crate_name::Context<'_>, schema_env: #crate_name::SchemaEnv, query_env: #crate_name::QueryEnv) -> #crate_name::Result<::std::pin::Pin<Box<dyn #crate_name::futures::Stream<Item = #crate_name::Response> + Send>>> {
#create_merged_obj.create_field_stream(idx, ctx, schema_env, query_env).await #create_merged_obj.create_field_stream(idx, ctx, schema_env, query_env).await
} }
} }

View File

@ -284,10 +284,19 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
if !*state { if !*state {
return #crate_name::futures::future::ready(None); return #crate_name::futures::future::ready(None);
} }
if item.is_err() { let resp = match item {
Ok(value) => #crate_name::Response {
data: value,
extensions: None,
cache_control: Default::default(),
error: None,
}
Err(err) => err.into(),
};
if resp.is_err() {
*state = false; *state = false;
} }
return #crate_name::futures::future::ready(Some(item)); return #crate_name::futures::future::ready(Some(resp));
}); });
return Ok(Box::pin(stream)); return Ok(Box::pin(stream));
} }
@ -341,7 +350,7 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
ctx: &#crate_name::Context<'_>, ctx: &#crate_name::Context<'_>,
schema_env: #crate_name::SchemaEnv, schema_env: #crate_name::SchemaEnv,
query_env: #crate_name::QueryEnv, query_env: #crate_name::QueryEnv,
) -> #crate_name::Result<::std::pin::Pin<Box<dyn #crate_name::futures::Stream<Item = #crate_name::Result<#crate_name::serde_json::Value>> + Send>>> { ) -> #crate_name::Result<::std::pin::Pin<Box<dyn #crate_name::futures::Stream<Item = #crate_name::Response> + Send>>> {
#(#create_stream)* #(#create_stream)*
Err(#crate_name::QueryError::FieldNotFound { Err(#crate_name::QueryError::FieldNotFound {
field_name: ctx.node.name.to_string(), field_name: ctx.node.name.to_string(),

View File

@ -10,7 +10,7 @@ use actix_web::http::StatusCode;
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder}; use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
use async_graphql::http::StreamBody; use async_graphql::http::StreamBody;
use async_graphql::{ use async_graphql::{
GQLQueryResponse, IntoQueryBuilder, ParseRequestError, QueryBuilder, ReceiveMultipartOptions, IntoQueryBuilder, ParseRequestError, QueryBuilder, ReceiveMultipartOptions, Response,
}; };
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::future::Ready; use futures::future::Ready;
@ -87,10 +87,10 @@ impl FromRequest for GQLRequest {
} }
/// Responder for GraphQL response /// Responder for GraphQL response
pub struct GQLResponse(async_graphql::Result<GQLQueryResponse>); pub struct GQLResponse(async_graphql::Result<Response>);
impl From<async_graphql::Result<GQLQueryResponse>> for GQLResponse { impl From<async_graphql::Result<Response>> for GQLResponse {
fn from(resp: async_graphql::Result<GQLQueryResponse>) -> Self { fn from(resp: async_graphql::Result<Response>) -> Self {
GQLResponse(resp) GQLResponse(resp)
} }
} }
@ -109,11 +109,8 @@ impl Responder for GQLResponse {
} }
} }
fn add_cache_control( fn add_cache_control(builder: &mut HttpResponseBuilder, resp: &async_graphql::Result<Response>) {
builder: &mut HttpResponseBuilder, if let Ok(Response { cache_control, .. }) = resp {
resp: &async_graphql::Result<GQLQueryResponse>,
) {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() { if let Some(cache_control) = cache_control.value() {
builder.header("cache-control", cache_control); builder.header("cache-control", cache_control);
} }

View File

@ -4,7 +4,7 @@
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
use async_graphql::{ use async_graphql::{
GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Schema, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Response, Schema,
SubscriptionType, Variables, SubscriptionType, Variables,
}; };
use log::{error, info}; use log::{error, info};
@ -258,7 +258,7 @@ impl FromData for GQLRequest {
/// Wrapper around `async-graphql::query::QueryResponse` for implementing the trait /// Wrapper around `async-graphql::query::QueryResponse` for implementing the trait
/// `rocket::response::responder::Responder`, so that `GQLResponse` can directly be returned /// `rocket::response::responder::Responder`, so that `GQLResponse` can directly be returned
/// from a Rocket Route function. /// from a Rocket Route function.
pub struct GQLResponse(pub GQLQueryResponse); pub struct GQLResponse(pub Response);
impl<'r> Responder<'r, 'static> for GQLResponse { impl<'r> Responder<'r, 'static> for GQLResponse {
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
@ -277,13 +277,13 @@ impl<'r> Responder<'r, 'static> for GQLResponse {
/// Extension trait, to allow the use of `cache_control` with for example `ResponseBuilder`. /// Extension trait, to allow the use of `cache_control` with for example `ResponseBuilder`.
pub trait CacheControl { pub trait CacheControl {
/// Add the `async-graphql::query::QueryResponse` cache control value as header to the Rocket response. /// Add the `async-graphql::query::QueryResponse` cache control value as header to the Rocket response.
fn cache_control(&mut self, resp: &async_graphql::Result<GQLQueryResponse>) -> &mut Self; fn cache_control(&mut self, resp: &async_graphql::Result<Response>) -> &mut Self;
} }
impl<'r> CacheControl for ResponseBuilder<'r> { impl<'r> CacheControl for ResponseBuilder<'r> {
fn cache_control( fn cache_control(
&mut self, &mut self,
resp: &async_graphql::Result<GQLQueryResponse>, resp: &async_graphql::Result<Response>,
) -> &mut ResponseBuilder<'r> { ) -> &mut ResponseBuilder<'r> {
match resp { match resp {
Ok(resp) if resp.cache_control.value().is_some() => self.header(Header::new( Ok(resp) if resp.cache_control.value().is_some() => self.header(Header::new(

View File

@ -7,7 +7,7 @@
use async_graphql::http::{GQLRequest, GQLResponse}; use async_graphql::http::{GQLRequest, GQLResponse};
use async_graphql::{ use async_graphql::{
GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Schema, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions, Response, Schema,
SubscriptionType, SubscriptionType,
}; };
use async_trait::async_trait; use async_trait::async_trait;
@ -123,22 +123,19 @@ impl<State: Clone + Send + Sync + 'static> RequestExt<State> for Request<State>
/// ///
pub trait ResponseExt: Sized { pub trait ResponseExt: Sized {
/// Set body as the result of a GraphQL query. /// Set body as the result of a GraphQL query.
fn body_graphql(self, res: async_graphql::Result<GQLQueryResponse>) -> tide::Result<Self>; fn body_graphql(self, res: async_graphql::Result<Response>) -> tide::Result<Self>;
} }
impl ResponseExt for Response { impl ResponseExt for Response {
fn body_graphql(self, res: async_graphql::Result<GQLQueryResponse>) -> tide::Result<Self> { fn body_graphql(self, res: async_graphql::Result<Response>) -> tide::Result<Self> {
let mut resp = add_cache_control(self, &res); let mut resp = add_cache_control(self, &res);
resp.set_body(Body::from_json(&GQLResponse(res))?); resp.set_body(Body::from_json(&GQLResponse(res))?);
Ok(resp) Ok(resp)
} }
} }
fn add_cache_control( fn add_cache_control(mut http_resp: Response, resp: &async_graphql::Result<Response>) -> Response {
mut http_resp: Response, if let Ok(Response { cache_control, .. }) = resp {
resp: &async_graphql::Result<GQLQueryResponse>,
) -> Response {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() { if let Some(cache_control) = cache_control.value() {
if let Ok(header) = tide::http::headers::HeaderName::from_str("cache-control") { if let Ok(header) = tide::http::headers::HeaderName::from_str("cache-control") {
http_resp.insert_header(header, cache_control); http_resp.insert_header(header, cache_control);

View File

@ -7,8 +7,8 @@
use async_graphql::http::{GQLRequest, StreamBody}; use async_graphql::http::{GQLRequest, StreamBody};
use async_graphql::{ use async_graphql::{
Data, FieldResult, GQLQueryResponse, IntoQueryBuilder, ObjectType, QueryBuilder, Data, FieldResult, IntoQueryBuilder, ObjectType, QueryBuilder, ReceiveMultipartOptions,
ReceiveMultipartOptions, Schema, SubscriptionType, WebSocketTransport, Response, Schema, SubscriptionType, WebSocketTransport,
}; };
use futures::select; use futures::select;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
@ -231,16 +231,16 @@ where
} }
/// GraphQL reply /// GraphQL reply
pub struct GQLResponse(async_graphql::Result<GQLQueryResponse>); pub struct GQLResponse(async_graphql::Result<Response>);
impl From<async_graphql::Result<GQLQueryResponse>> for GQLResponse { impl From<async_graphql::Result<Response>> for GQLResponse {
fn from(resp: async_graphql::Result<GQLQueryResponse>) -> Self { fn from(resp: async_graphql::Result<Response>) -> Self {
GQLResponse(resp) GQLResponse(resp)
} }
} }
fn add_cache_control(http_resp: &mut Response, resp: &async_graphql::Result<GQLQueryResponse>) { fn add_cache_control(http_resp: &mut Response, resp: &async_graphql::Result<Response>) {
if let Ok(GQLQueryResponse { cache_control, .. }) = resp { if let Ok(Response { cache_control, .. }) = resp {
if let Some(cache_control) = cache_control.value() { if let Some(cache_control) = cache_control.value() {
if let Ok(value) = cache_control.parse() { if let Ok(value) = cache_control.parse() {
http_resp.headers_mut().insert("cache-control", value); http_resp.headers_mut().insert("cache-control", value);

View File

@ -1,7 +1,8 @@
use crate::Error; use crate::Error;
use pest::iterators::Pair; use pest::iterators::Pair;
use pest::RuleType; use pest::RuleType;
use serde::Serialize; use serde::ser::SerializeMap;
use serde::{Serialize, Serializer};
use std::borrow::{Borrow, BorrowMut}; use std::borrow::{Borrow, BorrowMut};
use std::cmp::Ordering; use std::cmp::Ordering;
use std::fmt; use std::fmt;
@ -9,7 +10,7 @@ use std::hash::{Hash, Hasher};
use std::str::Chars; use std::str::Chars;
/// Original position of an element in source code. /// Original position of an element in source code.
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Copy, Default, Hash, Serialize)] #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Copy, Default, Hash)]
pub struct Pos { pub struct Pos {
/// One-based line number. /// One-based line number.
pub line: usize, pub line: usize,
@ -30,6 +31,18 @@ impl fmt::Display for Pos {
} }
} }
impl Serialize for Pos {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("line", &self.line)?;
map.serialize_entry("column", &self.column)?;
map.end()
}
}
/// An AST node that stores its original position. /// An AST node that stores its original position.
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]
pub struct Positioned<T: ?Sized> { pub struct Positioned<T: ?Sized> {

View File

@ -353,6 +353,17 @@ pub enum ParseRequestError {
PayloadTooLarge, PayloadTooLarge,
} }
impl From<multer::Error> for ParseRequestError {
fn from(err: multer::Error) -> Self {
match err {
multer::Error::FieldSizeExceeded { .. } | multer::Error::StreamSizeExceeded { .. } => {
ParseRequestError::PayloadTooLarge
}
_ => ParseRequestError::InvalidMultipart(err),
}
}
}
/// Verification error. /// Verification error.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct RuleError { pub struct RuleError {

View File

@ -4,15 +4,11 @@ mod graphiql_source;
mod playground_source; mod playground_source;
mod stream_body; mod stream_body;
use itertools::Itertools;
pub use graphiql_source::graphiql_source; pub use graphiql_source::graphiql_source;
pub use playground_source::{playground_source, GraphQLPlaygroundConfig}; pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
pub use stream_body::StreamBody; pub use stream_body::StreamBody;
use crate::{Error, GQLQueryResponse, ParseRequestError, Pos, QueryError, Result, Variables}; use serde::Deserialize;
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Deserialize, Serialize, Serializer};
/// Deserializable GraphQL Request object /// Deserializable GraphQL Request object
#[derive(Deserialize, Clone, PartialEq, Debug)] #[derive(Deserialize, Clone, PartialEq, Debug)]
@ -31,7 +27,6 @@ pub struct GQLRequest {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::Pos;
use serde_json::json; use serde_json::json;
#[test] #[test]

View File

@ -98,14 +98,20 @@
#![recursion_limit = "256"] #![recursion_limit = "256"]
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
// Do not try to modify the location of this line of code, it must be
// the first mod defined, otherwise there will be a danger of failing to compile.
#[macro_use]
mod macros;
mod base; mod base;
mod context; mod context;
mod error; mod error;
mod look_ahead; mod look_ahead;
mod model; mod model;
mod mutation_resolver; mod mutation_resolver;
mod query; mod request;
mod resolver; mod resolver;
mod response;
mod scalars; mod scalars;
mod schema; mod schema;
mod serialize_resp; mod serialize_resp;
@ -141,14 +147,13 @@ pub use error::{
}; };
pub use look_ahead::Lookahead; pub use look_ahead::Lookahead;
pub use parser::{types::ConstValue as Value, Pos, Positioned}; pub use parser::{types::ConstValue as Value, Pos, Positioned};
pub use query::{GQLQuery, GQLQueryResponse, ReceiveMultipartOptions};
pub use registry::CacheControl; pub use registry::CacheControl;
pub use request::{ReceiveMultipartOptions, Request};
pub use response::Response;
pub use scalars::{Any, Json, OutputJson, ID}; pub use scalars::{Any, Json, OutputJson, ID};
pub use schema::{Schema, SchemaBuilder, SchemaEnv}; pub use schema::{Schema, SchemaBuilder, SchemaEnv};
pub use serde_json::Number; pub use serde_json::Number;
pub use subscription::{ pub use subscription::{ConnectionTransport, SimpleBroker, SubscriptionStreams};
ConnectionTransport, SimpleBroker, SubscriptionStreams, WebSocketTransport,
};
pub use types::{ pub use types::{
connection, EmptyMutation, EmptySubscription, MaybeUndefined, MergedObject, connection, EmptyMutation, EmptySubscription, MaybeUndefined, MergedObject,
MergedObjectSubscriptionTail, MergedObjectTail, Upload, MergedObjectSubscriptionTail, MergedObjectTail, Upload,
@ -164,7 +169,6 @@ pub use context::ContextSelectionSet;
#[doc(hidden)] #[doc(hidden)]
pub mod registry; pub mod registry;
#[doc(hidden)] #[doc(hidden)]
pub use base::{BoxFieldFuture, InputObjectType, InputValueType, ObjectType, OutputValueType}; pub use base::{BoxFieldFuture, InputObjectType, InputValueType, ObjectType, OutputValueType};
#[doc(hidden)] #[doc(hidden)]

8
src/macros.rs Normal file
View File

@ -0,0 +1,8 @@
macro_rules! try_query_result {
($res:expr) => {
match $res {
Ok(resp) => resp,
Err(err) => return err.into(),
}
};
}

View File

@ -1,24 +1,15 @@
use crate::context::{Data, ResolveId}; use crate::parser::types::UploadValue;
use crate::extensions::{BoxExtension, ErrorLogger, Extension}; use crate::{http, Data, ParseRequestError, Value, Variables};
use crate::parser::types::{OperationType, UploadValue};
use crate::{
do_resolve, http, CacheControl, ContextBase, Error, ObjectType, ParseRequestError, Pos,
QueryEnv, QueryError, Result, Schema, SubscriptionType, Value, Variables,
};
use bytes::Bytes; use bytes::Bytes;
use futures::stream; use futures::stream;
use futures::task::Poll; use futures::task::Poll;
use futures::{AsyncRead, AsyncReadExt, Stream}; use futures::{AsyncRead, AsyncReadExt, Stream};
use multer::{Constraints, Multipart, SizeLimit}; use multer::{Constraints, Multipart, SizeLimit};
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Serialize, Serializer};
use std::any::Any; use std::any::Any;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io; use std::io::{self, Seek, SeekFrom, Write};
use std::io::{Seek, SeekFrom, Write}; use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
/// Options for `GQLQuery::receive_multipart` /// Options for `GQLQuery::receive_multipart`
#[derive(Default, Clone)] #[derive(Default, Clone)]
@ -30,23 +21,21 @@ pub struct ReceiveMultipartOptions {
pub max_num_files: Option<usize>, pub max_num_files: Option<usize>,
} }
pub struct GQLQuery { pub struct Request {
pub(crate) query: String, pub(crate) query: String,
pub(crate) operation_name: Option<String>, pub(crate) operation_name: Option<String>,
pub(crate) variables: Variables, pub(crate) variables: Variables,
pub(crate) ctx_data: Data, pub(crate) ctx_data: Data,
pub(crate) extensions: Vec<Box<dyn Fn() -> BoxExtension + Send + Sync>>,
} }
impl GQLQuery { impl Request {
/// Create a query with query source. /// Create a request object with query source.
pub fn new(query: impl Into<String>) -> Self { pub fn new(query: impl Into<String>) -> Self {
Self { Self {
query: query.into(), query: query.into(),
operation_name: None, operation_name: None,
variables: Variables::default(), variables: Variables::default(),
ctx_data: Data::default(), ctx_data: Data::default(),
extensions: Vec::default(),
} }
} }
@ -59,13 +48,12 @@ impl GQLQuery {
.map(|value| Variables::parse_from_json(value)) .map(|value| Variables::parse_from_json(value))
.unwrap_or_default(), .unwrap_or_default(),
ctx_data: Data::default(), ctx_data: Data::default(),
extensions: Vec::default(),
} }
} }
pub async fn receive_request( pub async fn receive_request(
content_type: Option<impl AsRef<str>>, content_type: Option<impl AsRef<str>>,
mut body: impl AsyncRead, mut body: impl AsyncRead + Send + Unpin + 'static,
opts: ReceiveMultipartOptions, opts: ReceiveMultipartOptions,
) -> std::result::Result<Self, ParseRequestError> { ) -> std::result::Result<Self, ParseRequestError> {
if let Some(boundary) = content_type.and_then(|ct| multer::parse_boundary(ct).ok()) { if let Some(boundary) = content_type.and_then(|ct| multer::parse_boundary(ct).ok()) {
@ -87,7 +75,7 @@ impl GQLQuery {
}), }),
); );
let mut query = None; let mut request = None;
let mut map = None; let mut map = None;
let mut files = Vec::new(); let mut files = Vec::new();
@ -95,7 +83,7 @@ impl GQLQuery {
match field.name() { match field.name() {
Some("operations") => { Some("operations") => {
let request_str = field.text().await?; let request_str = field.text().await?;
query = Some(Self::new_with_http_request( request = Some(Self::new_with_http_request(
serde_json::from_str(&request_str) serde_json::from_str(&request_str)
.map_err(ParseRequestError::InvalidRequest)?, .map_err(ParseRequestError::InvalidRequest)?,
)); ));
@ -125,13 +113,13 @@ impl GQLQuery {
} }
} }
let mut query = query.ok_or(ParseRequestError::MissingOperatorsPart)?; let mut request = request.ok_or(ParseRequestError::MissingOperatorsPart)?;
let map = map.as_mut().ok_or(ParseRequestError::MissingMapPart)?; let map = map.as_mut().ok_or(ParseRequestError::MissingMapPart)?;
for (name, filename, content_type, file) in files { for (name, filename, content_type, file) in files {
if let Some(var_paths) = map.remove(&name) { if let Some(var_paths) = map.remove(&name) {
for var_path in var_paths { for var_path in var_paths {
query.set_upload( request.set_upload(
&var_path, &var_path,
filename.clone(), filename.clone(),
content_type.clone(), content_type.clone(),
@ -145,7 +133,7 @@ impl GQLQuery {
return Err(ParseRequestError::MissingFiles); return Err(ParseRequestError::MissingFiles);
} }
Ok(query) Ok(request)
} else { } else {
let mut data = Vec::new(); let mut data = Vec::new();
body.read_to_end(&mut data) body.read_to_end(&mut data)
@ -170,16 +158,6 @@ impl GQLQuery {
Self { variables, ..self } Self { variables, ..self }
} }
/// Add an extension
pub fn extension<F: Fn() -> E + Send + Sync + 'static, E: Extension>(
mut self,
extension_factory: F,
) -> Self {
self.extensions
.push(Box::new(move || Box::new(extension_factory())));
self
}
/// Add a context data that can be accessed in the `Context`, you access it with `Context::data`. /// Add a context data that can be accessed in the `Context`, you access it with `Context::data`.
/// ///
/// **This data is only valid for this query** /// **This data is only valid for this query**
@ -208,57 +186,18 @@ impl GQLQuery {
} }
} }
impl<T: Into<String>> From<T> for GQLQuery { impl<T: Into<String>> From<T> for Request {
fn from(query: T) -> Self { fn from(query: T) -> Self {
Self::new(query) Self::new(query)
} }
} }
impl From<http::GQLRequest> for GQLQuery { impl From<http::GQLRequest> for Request {
fn from(request: http::GQLRequest) -> Self { fn from(request: http::GQLRequest) -> Self {
Self::new_with_http_request(request) Self::new_with_http_request(request)
} }
} }
/// Query response
#[derive(Debug)]
pub struct GQLQueryResponse {
/// Data of query result
pub data: serde_json::Value,
/// Extensions result
pub extensions: Option<serde_json::Value>,
/// Cache control value
pub cache_control: CacheControl,
/// Error
pub error: Option<Error>,
}
impl GQLQueryResponse {
#[inline]
pub fn is_err(&self) -> bool {
self.error.is_some()
}
#[inline]
pub fn unwrap_err(self) -> Error {
self.error.unwrap()
}
}
impl From<Error> for GQLQueryResponse {
fn from(err: Error) -> Self {
Self {
data: serde_json::Value::Null,
extensions: None,
cache_control: CacheControl::default(),
error: Some(err),
}
}
}
fn reader_stream( fn reader_stream(
mut reader: impl AsyncRead + Unpin + Send + 'static, mut reader: impl AsyncRead + Unpin + Send + 'static,
) -> impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static { ) -> impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static {

40
src/response.rs Normal file
View File

@ -0,0 +1,40 @@
use crate::{CacheControl, Error};
/// Query response
#[derive(Debug)]
pub struct Response {
/// Data of query result
pub data: serde_json::Value,
/// Extensions result
pub extensions: Option<serde_json::Value>,
/// Cache control value
pub cache_control: CacheControl,
/// Error
pub error: Option<Error>,
}
impl Response {
#[inline]
pub fn is_err(&self) -> bool {
self.error.is_some()
}
#[inline]
pub fn unwrap_err(self) -> Error {
self.error.unwrap()
}
}
impl From<Error> for Response {
fn from(err: Error) -> Self {
Self {
data: serde_json::Value::Null,
extensions: None,
cache_control: CacheControl::default(),
error: Some(err),
}
}
}

View File

@ -5,14 +5,13 @@ use crate::mutation_resolver::do_mutation_resolve;
use crate::parser::parse_query; use crate::parser::parse_query;
use crate::parser::types::{ExecutableDocument, OperationType}; use crate::parser::types::{ExecutableDocument, OperationType};
use crate::registry::{MetaDirective, MetaInputValue, Registry}; use crate::registry::{MetaDirective, MetaInputValue, Registry};
use crate::subscription::{create_connection, create_subscription_stream, ConnectionTransport}; use crate::subscription::create_subscription_stream;
use crate::types::QueryRoot; use crate::types::QueryRoot;
use crate::validation::{check_rules, CheckResult, ValidationMode}; use crate::validation::{check_rules, CheckResult, ValidationMode};
use crate::{ use crate::{
do_resolve, CacheControl, ContextBase, Error, GQLQueryResponse, ObjectType, Pos, QueryEnv, do_resolve, CacheControl, ContextBase, Error, ObjectType, Pos, QueryEnv, QueryError, Request,
QueryError, Result, SubscriptionType, Type, Variables, ID, Response, Result, SubscriptionType, Type, Variables, ID,
}; };
use futures::channel::mpsc;
use futures::Stream; use futures::Stream;
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
use itertools::Itertools; use itertools::Itertools;
@ -311,14 +310,12 @@ where
&self, &self,
source: &str, source: &str,
variables: &Variables, variables: &Variables,
query_extensions: &[Box<dyn Fn() -> BoxExtension + Send + Sync>],
) -> Result<(ExecutableDocument, CacheControl, spin::Mutex<Extensions>)> { ) -> Result<(ExecutableDocument, CacheControl, spin::Mutex<Extensions>)> {
// create extension instances // create extension instances
let extensions = spin::Mutex::new(Extensions( let extensions = spin::Mutex::new(Extensions(
self.0 self.0
.extensions .extensions
.iter() .iter()
.chain(query_extensions)
.map(|factory| factory()) .map(|factory| factory())
.collect_vec(), .collect_vec(),
)); ));
@ -362,108 +359,110 @@ where
} }
/// Execute query without create the `QueryBuilder`. /// Execute query without create the `QueryBuilder`.
pub async fn execute(&self, query: GQLQuery) -> GQLQueryResponse { pub async fn execute(&self, query: Request) -> Response {
let (document, cache_control, extensions) = let (document, cache_control, extensions) =
self.prepare_query(&query.query, &query.variables, &query.extensions)?; try_query_result!(self.prepare_query(&query.query, &query.variables));
// execute // execute
let inc_resolve_id = AtomicUsize::default(); let inc_resolve_id = AtomicUsize::default();
let document = match document.into_data(self.operation_name.as_deref()) { let document = match document.into_data(query.operation_name.as_deref()) {
Some(document) => document, Some(document) => document,
None => { None => {
return if let Some(operation_name) = self.operation_name { let err = if let Some(operation_name) = query.operation_name {
Err(Error::Query { Error::Query {
pos: Pos::default(), pos: Pos::default(),
path: None, path: None,
err: QueryError::UnknownOperationNamed { err: QueryError::UnknownOperationNamed {
name: operation_name, name: operation_name,
}, },
}) }
} else { } else {
Err(Error::Query { Error::Query {
pos: Pos::default(), pos: Pos::default(),
path: None, path: None,
err: QueryError::MissingOperation, err: QueryError::MissingOperation,
}) }
} };
.log_error(&extensions) extensions.lock().error(&err);
.into() return err.into();
} }
}; };
let env = QueryEnv::new( let env = QueryEnv::new(
extensions, extensions,
self.variables, query.variables,
document, document,
Arc::new(self.ctx_data.unwrap_or_default()), Arc::new(query.ctx_data),
); );
let ctx = ContextBase { let ctx = ContextBase {
path_node: None, path_node: None,
resolve_id: ResolveId::root(), resolve_id: ResolveId::root(),
inc_resolve_id: &inc_resolve_id, inc_resolve_id: &inc_resolve_id,
item: &env.document.operation.node.selection_set, item: &env.document.operation.node.selection_set,
schema_env: &schema.env, schema_env: &self.env,
query_env: &env, query_env: &env,
}; };
env.extensions.lock().execution_start(); env.extensions.lock().execution_start();
let data = match &env.document.operation.node.ty { let data = match &env.document.operation.node.ty {
OperationType::Query => do_resolve(&ctx, &schema.query).await?, OperationType::Query => try_query_result!(do_resolve(&ctx, &self.query).await),
OperationType::Mutation => do_mutation_resolve(&ctx, &schema.mutation).await?, OperationType::Mutation => {
try_query_result!(do_mutation_resolve(&ctx, &self.mutation).await)
}
OperationType::Subscription => { OperationType::Subscription => {
return Err(Error::Query { return Error::Query {
pos: Pos::default(), pos: Pos::default(),
path: None, path: None,
err: QueryError::NotSupported, err: QueryError::NotSupported,
}) }
.into() .into()
} }
}; };
env.extensions.lock().execution_end(); env.extensions.lock().execution_end();
GQLQueryResponse { let extensions = env.extensions.lock().result();
Response {
data, data,
extensions: env.extensions.lock().result(), extensions,
cache_control, cache_control,
error: None, error: None,
} }
} }
/// Create subscription stream, typically called inside the `SubscriptionTransport::handle_request` method pub async fn execute_stream(
pub async fn create_subscription_stream(
&self, &self,
source: &str, query: Request,
operation_name: Option<&str>, ) -> Result<impl Stream<Item = Response> + Send> {
variables: Variables, let (document, _, extensions) = self.prepare_query(&query.query, &query.variables)?;
ctx_data: Option<Arc<Data>>,
) -> Result<impl Stream<Item = Result<serde_json::Value>> + Send> {
let (document, _, extensions) = self.prepare_query(source, &variables, &Vec::new())?;
let document = match document.into_data(operation_name) { let document = match document.into_data(query.operation_name.as_deref()) {
Some(document) => document, Some(document) => document,
None => { None => {
return if let Some(name) = operation_name { let err = if let Some(name) = query.operation_name {
Err(QueryError::UnknownOperationNamed { QueryError::UnknownOperationNamed {
name: name.to_string(), name: name.to_string(),
} }
.into_error(Pos::default())) .into_error(Pos::default())
} else { } else {
Err(QueryError::MissingOperation.into_error(Pos::default())) QueryError::MissingOperation.into_error(Pos::default())
} };
.log_error(&extensions) extensions.lock().error(&err);
return Err(err.into());
} }
}; };
if document.operation.node.ty != OperationType::Subscription { if document.operation.node.ty != OperationType::Subscription {
return Err(QueryError::NotSupported.into_error(Pos::default())).log_error(&extensions); let err = QueryError::NotSupported.into_error(Pos::default());
extensions.lock().error(&err);
return Err(err);
} }
let resolve_id = AtomicUsize::default(); let resolve_id = AtomicUsize::default();
let env = QueryEnv::new( let env = QueryEnv::new(
extensions, extensions,
variables, query.variables,
document, document,
ctx_data.unwrap_or_default(), Arc::new(query.ctx_data),
); );
let ctx = env.create_context( let ctx = env.create_context(
&self.env, &self.env,
@ -472,20 +471,12 @@ where
&resolve_id, &resolve_id,
); );
let mut streams = Vec::new(); let mut streams = Vec::new();
create_subscription_stream(self, env.clone(), &ctx, &mut streams) match create_subscription_stream(self, env.clone(), &ctx, &mut streams).await {
.await Ok(()) => Ok(futures::stream::select_all(streams)),
.log_error(&ctx.query_env.extensions)?; Err(err) => {
Ok(futures::stream::select_all(streams)) env.extensions.lock().error(&err);
} Err(err)
}
/// Create subscription connection, returns `Sink` and `Stream`. }
pub fn subscription_connection<T: ConnectionTransport>(
&self,
transport: T,
) -> (
mpsc::UnboundedSender<Vec<u8>>,
impl Stream<Item = Vec<u8>> + Unpin,
) {
create_connection(self.clone(), transport)
} }
} }

View File

@ -1,15 +1,16 @@
use crate::{Error, GQLQueryResponse, Pos, QueryError}; use crate::{Error, QueryError, Response};
use itertools::Itertools;
use serde::ser::{SerializeMap, SerializeSeq}; use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Serialize, Serializer}; use serde::{Serialize, Serializer};
impl Serialize for GQLQueryResponse { impl Serialize for Response {
fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> { fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
match &self.error { match &self.error {
None => { None => {
let mut map = serializer.serialize_map(None)?; let mut map = serializer.serialize_map(None)?;
map.serialize_key("data")?; map.serialize_key("data")?;
map.serialize_value(&self.data)?; map.serialize_value(&self.data)?;
if res.extensions.is_some() { if self.extensions.is_some() {
map.serialize_key("extensions")?; map.serialize_key("extensions")?;
map.serialize_value(&self.extensions)?; map.serialize_value(&self.extensions)?;
} }
@ -25,18 +26,6 @@ impl Serialize for GQLQueryResponse {
} }
} }
impl<'a> Serialize for Pos {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_entry("line", &self.0.line)?;
map.serialize_entry("column", &self.0.column)?;
map.end()
}
}
impl<'a> Serialize for Error { impl<'a> Serialize for Error {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where where
@ -105,7 +94,7 @@ mod tests {
#[test] #[test]
fn test_response_data() { fn test_response_data() {
let resp = GQLQueryResponse { let resp = Response {
data: json!({"ok": true}), data: json!({"ok": true}),
extensions: None, extensions: None,
cache_control: Default::default(), cache_control: Default::default(),
@ -123,7 +112,7 @@ mod tests {
#[test] #[test]
fn test_field_error_with_extension() { fn test_field_error_with_extension() {
let resp = GQLQueryResponse::from(Error::Query { let resp = Response::from(Error::Query {
pos: Pos { pos: Pos {
line: 10, line: 10,
column: 20, column: 20,
@ -153,7 +142,7 @@ mod tests {
#[test] #[test]
fn test_response_error_with_pos() { fn test_response_error_with_pos() {
let resp = GQLQueryResponse::from(Error::Query { let resp = Response::from(Error::Query {
pos: Pos { pos: Pos {
line: 10, line: 10,
column: 20, column: 20,

View File

@ -1,9 +1,8 @@
mod connection; mod connection;
mod simple_broker; mod simple_broker;
mod subscription_type; mod subscription_type;
mod ws_transport; // mod ws_transport;
pub use connection::{create_connection, ConnectionTransport, SubscriptionStreams}; pub use connection::{create_connection, ConnectionTransport, SubscriptionStreams};
pub use simple_broker::SimpleBroker; pub use simple_broker::SimpleBroker;
pub use subscription_type::{create_subscription_stream, SubscriptionType}; pub use subscription_type::{create_subscription_stream, SubscriptionType};
pub use ws_transport::WebSocketTransport;

View File

@ -1,6 +1,6 @@
use crate::context::QueryEnv; use crate::context::QueryEnv;
use crate::parser::types::{Selection, TypeCondition}; use crate::parser::types::{Selection, TypeCondition};
use crate::{Context, ContextSelectionSet, ObjectType, Result, Schema, SchemaEnv, Type}; use crate::{Context, ContextSelectionSet, ObjectType, Response, Result, Schema, SchemaEnv, Type};
use futures::{Future, Stream}; use futures::{Future, Stream};
use std::pin::Pin; use std::pin::Pin;
@ -20,7 +20,7 @@ pub trait SubscriptionType: Type {
ctx: &Context<'_>, ctx: &Context<'_>,
schema_env: SchemaEnv, schema_env: SchemaEnv,
query_env: QueryEnv, query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>>; ) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>>;
} }
type BoxCreateStreamFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>; type BoxCreateStreamFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
@ -29,7 +29,7 @@ pub fn create_subscription_stream<'a, Query, Mutation, Subscription>(
schema: &'a Schema<Query, Mutation, Subscription>, schema: &'a Schema<Query, Mutation, Subscription>,
environment: QueryEnv, environment: QueryEnv,
ctx: &'a ContextSelectionSet<'_>, ctx: &'a ContextSelectionSet<'_>,
streams: &'a mut Vec<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>>, streams: &'a mut Vec<Pin<Box<dyn Stream<Item = Response> + Send>>>,
) -> BoxCreateStreamFuture<'a> ) -> BoxCreateStreamFuture<'a>
where where
Query: ObjectType + Send + Sync + 'static, Query: ObjectType + Send + Sync + 'static,
@ -109,7 +109,7 @@ impl<T: SubscriptionType + Send + Sync> SubscriptionType for &T {
ctx: &Context<'_>, ctx: &Context<'_>,
schema_env: SchemaEnv, schema_env: SchemaEnv,
query_env: QueryEnv, query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>> { ) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>> {
T::create_field_stream(*self, idx, ctx, schema_env, query_env).await T::create_field_stream(*self, idx, ctx, schema_env, query_env).await
} }
} }

View File

@ -1,5 +1,7 @@
use crate::context::QueryEnv; use crate::context::QueryEnv;
use crate::{registry, Context, Error, Pos, QueryError, Result, SchemaEnv, SubscriptionType, Type}; use crate::{
registry, Context, Error, Pos, QueryError, Response, Result, SchemaEnv, SubscriptionType, Type,
};
use futures::Stream; use futures::Stream;
use std::borrow::Cow; use std::borrow::Cow;
use std::pin::Pin; use std::pin::Pin;
@ -39,7 +41,7 @@ impl SubscriptionType for EmptySubscription {
_ctx: &Context<'_>, _ctx: &Context<'_>,
_schema_env: SchemaEnv, _schema_env: SchemaEnv,
_query_env: QueryEnv, _query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>> ) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>>
where where
Self: Send + Sync + 'static + Sized, Self: Send + Sync + 'static + Sized,
{ {

View File

@ -2,7 +2,7 @@ use crate::parser::types::Field;
use crate::registry::{MetaType, Registry}; use crate::registry::{MetaType, Registry};
use crate::{ use crate::{
do_resolve, CacheControl, Context, ContextSelectionSet, Error, ObjectType, OutputValueType, do_resolve, CacheControl, Context, ContextSelectionSet, Error, ObjectType, OutputValueType,
Positioned, QueryEnv, QueryError, Result, SchemaEnv, SubscriptionType, Type, Positioned, QueryEnv, QueryError, Response, Result, SchemaEnv, SubscriptionType, Type,
}; };
use futures::Stream; use futures::Stream;
use indexmap::IndexMap; use indexmap::IndexMap;
@ -111,7 +111,7 @@ where
ctx: &Context<'_>, ctx: &Context<'_>,
schema_env: SchemaEnv, schema_env: SchemaEnv,
query_env: QueryEnv, query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>> { ) -> Result<Pin<Box<dyn Stream<Item = Response> + Send>>> {
match self match self
.0 .0
.create_field_stream(idx, ctx, schema_env.clone(), query_env.clone()) .create_field_stream(idx, ctx, schema_env.clone(), query_env.clone())

View File

@ -1,291 +1,291 @@
use async_graphql::*; // use async_graphql::*;
use futures::{SinkExt, Stream, StreamExt}; // use futures::{SinkExt, Stream, StreamExt};
//
#[async_std::test] // #[async_std::test]
pub async fn test_subscription_ws_transport() { // pub async fn test_subscription_ws_transport() {
struct QueryRoot; // struct QueryRoot;
//
#[Object] // #[Object]
impl QueryRoot {} // impl QueryRoot {}
//
struct SubscriptionRoot; // struct SubscriptionRoot;
//
#[Subscription] // #[Subscription]
impl SubscriptionRoot { // impl SubscriptionRoot {
async fn values(&self) -> impl Stream<Item = i32> { // async fn values(&self) -> impl Stream<Item = i32> {
futures::stream::iter(0..10) // futures::stream::iter(0..10)
} // }
} // }
//
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); // let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default()); // let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default());
//
sink.send( // sink.send(
serde_json::to_vec(&serde_json::json!({ // serde_json::to_vec(&serde_json::json!({
"type": "connection_init", // "type": "connection_init",
"payload": { "token": "123456" } // "payload": { "token": "123456" }
})) // }))
.unwrap(), // .unwrap(),
) // )
.await // .await
.unwrap(); // .unwrap();
//
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "connection_ack", // "type": "connection_ack",
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
//
sink.send( // sink.send(
serde_json::to_vec(&serde_json::json!({ // serde_json::to_vec(&serde_json::json!({
"type": "start", // "type": "start",
"id": "1", // "id": "1",
"payload": { // "payload": {
"query": "subscription { values }" // "query": "subscription { values }"
}, // },
})) // }))
.unwrap(), // .unwrap(),
) // )
.await // .await
.unwrap(); // .unwrap();
//
for i in 0..10 { // for i in 0..10 {
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "data", // "type": "data",
"id": "1", // "id": "1",
"payload": { "data": { "values": i } }, // "payload": { "data": { "values": i } },
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
} // }
} // }
//
#[async_std::test] // #[async_std::test]
pub async fn test_subscription_ws_transport_with_token() { // pub async fn test_subscription_ws_transport_with_token() {
struct Token(String); // struct Token(String);
//
struct QueryRoot; // struct QueryRoot;
//
#[Object] // #[Object]
impl QueryRoot {} // impl QueryRoot {}
//
struct SubscriptionRoot; // struct SubscriptionRoot;
//
#[Subscription] // #[Subscription]
impl SubscriptionRoot { // impl SubscriptionRoot {
async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> { // async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> {
if ctx.data_unchecked::<Token>().0 != "123456" { // if ctx.data_unchecked::<Token>().0 != "123456" {
return Err("forbidden".into()); // return Err("forbidden".into());
} // }
Ok(futures::stream::iter(0..10)) // Ok(futures::stream::iter(0..10))
} // }
} // }
//
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); // let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
//
let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::new(|value| { // let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::new(|value| {
#[derive(serde::Deserialize)] // #[derive(serde::Deserialize)]
struct Payload { // struct Payload {
token: String, // token: String,
} // }
//
let payload: Payload = serde_json::from_value(value).unwrap(); // let payload: Payload = serde_json::from_value(value).unwrap();
let mut data = Data::default(); // let mut data = Data::default();
data.insert(Token(payload.token)); // data.insert(Token(payload.token));
Ok(data) // Ok(data)
})); // }));
//
sink.send( // sink.send(
serde_json::to_vec(&serde_json::json!({ // serde_json::to_vec(&serde_json::json!({
"type": "connection_init", // "type": "connection_init",
"payload": { "token": "123456" } // "payload": { "token": "123456" }
})) // }))
.unwrap(), // .unwrap(),
) // )
.await // .await
.unwrap(); // .unwrap();
//
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "connection_ack", // "type": "connection_ack",
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
//
sink.send( // sink.send(
serde_json::to_vec(&serde_json::json!({ // serde_json::to_vec(&serde_json::json!({
"type": "start", // "type": "start",
"id": "1", // "id": "1",
"payload": { // "payload": {
"query": "subscription { values }" // "query": "subscription { values }"
}, // },
})) // }))
.unwrap(), // .unwrap(),
) // )
.await // .await
.unwrap(); // .unwrap();
//
for i in 0..10 { // for i in 0..10 {
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "data", // "type": "data",
"id": "1", // "id": "1",
"payload": { "data": { "values": i } }, // "payload": { "data": { "values": i } },
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
} // }
} // }
//
#[async_std::test] // #[async_std::test]
pub async fn test_subscription_ws_transport_error() { // pub async fn test_subscription_ws_transport_error() {
struct QueryRoot; // struct QueryRoot;
//
struct Event { // struct Event {
value: i32, // value: i32,
} // }
//
#[Object] // #[Object]
impl Event { // impl Event {
async fn value(&self) -> FieldResult<i32> { // async fn value(&self) -> FieldResult<i32> {
if self.value < 5 { // if self.value < 5 {
Ok(self.value) // Ok(self.value)
} else { // } else {
Err("TestError".into()) // Err("TestError".into())
} // }
} // }
} // }
//
#[Object] // #[Object]
impl QueryRoot {} // impl QueryRoot {}
//
struct SubscriptionRoot; // struct SubscriptionRoot;
//
#[Subscription] // #[Subscription]
impl SubscriptionRoot { // impl SubscriptionRoot {
async fn events(&self) -> impl Stream<Item = Event> { // async fn events(&self) -> impl Stream<Item = Event> {
futures::stream::iter((0..10).map(|n| Event { value: n })) // futures::stream::iter((0..10).map(|n| Event { value: n }))
} // }
} // }
//
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot); // let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
//
let (mut sink, mut stream) = // let (mut sink, mut stream) =
schema.subscription_connection(WebSocketTransport::new(|_| Ok(Data::default()))); // schema.subscription_connection(WebSocketTransport::new(|_| Ok(Data::default())));
//
sink.send( // sink.send(
serde_json::to_vec(&serde_json::json!({ // serde_json::to_vec(&serde_json::json!({
"type": "connection_init" // "type": "connection_init"
})) // }))
.unwrap(), // .unwrap(),
) // )
.await // .await
.unwrap(); // .unwrap();
//
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "connection_ack", // "type": "connection_ack",
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
//
sink.send( // sink.send(
serde_json::to_vec(&serde_json::json!({ // serde_json::to_vec(&serde_json::json!({
"type": "start", // "type": "start",
"id": "1", // "id": "1",
"payload": { // "payload": {
"query": "subscription { events { value } }" // "query": "subscription { events { value } }"
}, // },
})) // }))
.unwrap(), // .unwrap(),
) // )
.await // .await
.unwrap(); // .unwrap();
//
for i in 0i32..5 { // for i in 0i32..5 {
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "data", // "type": "data",
"id": "1", // "id": "1",
"payload": { "data": { "events": { "value": i } } }, // "payload": { "data": { "events": { "value": i } } },
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
} // }
//
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "error", // "type": "error",
"id": "1", // "id": "1",
"payload": [{ // "payload": [{
"message": "TestError", // "message": "TestError",
"locations": [{"line": 1, "column": 25}], // "locations": [{"line": 1, "column": 25}],
"path": ["events", "value"], // "path": ["events", "value"],
}], // }],
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
} // }
//
#[async_std::test] // #[async_std::test]
pub async fn test_query_over_websocket() { // pub async fn test_query_over_websocket() {
struct QueryRoot; // struct QueryRoot;
//
#[Object] // #[Object]
impl QueryRoot { // impl QueryRoot {
async fn value(&self) -> i32 { // async fn value(&self) -> i32 {
999 // 999
} // }
} // }
//
let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription); // let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default()); // let (mut sink, mut stream) = schema.subscription_connection(WebSocketTransport::default());
//
sink.send( // sink.send(
serde_json::to_vec(&serde_json::json!({ // serde_json::to_vec(&serde_json::json!({
"type": "connection_init", // "type": "connection_init",
})) // }))
.unwrap(), // .unwrap(),
) // )
.await // .await
.unwrap(); // .unwrap();
//
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "connection_ack", // "type": "connection_ack",
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
//
sink.send( // sink.send(
serde_json::to_vec(&serde_json::json!({ // serde_json::to_vec(&serde_json::json!({
"type": "start", // "type": "start",
"id": "1", // "id": "1",
"payload": { // "payload": {
"query": "query { value }" // "query": "query { value }"
}, // },
})) // }))
.unwrap(), // .unwrap(),
) // )
.await // .await
.unwrap(); // .unwrap();
//
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "data", // "type": "data",
"id": "1", // "id": "1",
"payload": { "data": { "value": 999 } }, // "payload": { "data": { "value": 999 } },
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
//
assert_eq!( // assert_eq!(
Some(serde_json::json!({ // Some(serde_json::json!({
"type": "complete", // "type": "complete",
"id": "1", // "id": "1",
})), // })),
serde_json::from_slice(&stream.next().await.unwrap()).unwrap() // serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
); // );
} // }