Add transports::websocket module.

This commit is contained in:
Sunli 2020-09-11 15:54:56 +08:00
parent 9cc293c8be
commit faac753096
9 changed files with 345 additions and 502 deletions

View File

@ -22,7 +22,7 @@ pub struct MultipartOptions {
/// Receive a multipart request.
pub async fn receive_multipart(
body: impl AsyncRead + Unpin + Send + 'static,
body: impl AsyncRead + Send + 'static,
boundary: impl Into<String>,
opts: MultipartOptions,
) -> Result<Request, ParseRequestError> {
@ -104,10 +104,10 @@ pub async fn receive_multipart(
}
fn reader_stream(
mut reader: impl AsyncRead + Unpin + Send + 'static,
) -> impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static {
reader: impl AsyncRead + Send + 'static,
) -> impl Stream<Item = io::Result<Bytes>> + Send + 'static {
let mut buf = [0u8; 2048];
let mut reader = Box::pin(reader);
stream::poll_fn(move |cx| {
Poll::Ready(
match futures::ready!(Pin::new(&mut reader).poll_read(cx, &mut buf)?) {

View File

@ -117,6 +117,7 @@ mod validation;
pub mod extensions;
pub mod guard;
pub mod validators;
pub use subscription::transports;
#[doc(hidden)]
pub use async_graphql_parser as parser;

View File

@ -3,7 +3,7 @@ use crate::extensions::{BoxExtension, ErrorLogger, Extension, Extensions};
use crate::model::__DirectiveLocation;
use crate::mutation_resolver::do_mutation_resolve;
use crate::parser::parse_query;
use crate::parser::types::{ExecutableDocument, OperationType};
use crate::parser::types::OperationType;
use crate::registry::{MetaDirective, MetaInputValue, Registry};
use crate::subscription::create_subscription_stream;
use crate::types::QueryRoot;
@ -12,6 +12,7 @@ use crate::{
do_resolve, CacheControl, ContextBase, Error, ObjectType, Pos, QueryEnv, QueryError, Request,
Response, Result, SubscriptionType, Type, Variables, ID,
};
use async_graphql_parser::types::ExecutableDocumentData;
use futures::{Stream, StreamExt};
use indexmap::map::IndexMap;
use itertools::Itertools;
@ -315,11 +316,14 @@ where
Self::build(query, mutation, subscription).finish()
}
fn prepare_query(
fn prepare_request(
&self,
source: &str,
variables: &Variables,
) -> Result<(ExecutableDocument, CacheControl, spin::Mutex<Extensions>)> {
request: &Request,
) -> Result<(
ExecutableDocumentData,
CacheControl,
spin::Mutex<Extensions>,
)> {
// create extension instances
let extensions = spin::Mutex::new(Extensions(
self.0
@ -329,8 +333,10 @@ where
.collect_vec(),
));
extensions.lock().parse_start(source, &variables);
let document = parse_query(source)
extensions
.lock()
.parse_start(&request.query, &request.variables);
let document = parse_query(&request.query)
.map_err(Into::<Error>::into)
.log_error(&extensions)?;
extensions.lock().parse_end(&document);
@ -344,7 +350,7 @@ where
} = check_rules(
&self.env.registry,
&document,
Some(&variables),
Some(&request.variables),
self.validation_mode,
)
.log_error(&extensions)?;
@ -364,26 +370,15 @@ where
}
}
Ok((document, cache_control, extensions))
}
/// Execute an GraphQL query.
pub async fn execute(&self, request: impl Into<Request>) -> Response {
let request = request.into();
let (document, cache_control, extensions) =
try_query_result!(self.prepare_query(&request.query, &request.variables));
// execute
let inc_resolve_id = AtomicUsize::default();
let document = match document.into_data(request.operation_name.as_deref()) {
Some(document) => document,
None => {
let err = if let Some(operation_name) = request.operation_name {
let err = if let Some(operation_name) = &request.operation_name {
Error::Query {
pos: Pos::default(),
path: None,
err: QueryError::UnknownOperationNamed {
name: operation_name,
name: operation_name.to_string(),
},
}
} else {
@ -394,16 +389,23 @@ where
}
};
extensions.lock().error(&err);
return err.into();
return Err(err);
}
};
let env = QueryEnv::new(
extensions,
request.variables,
document,
Arc::new(request.ctx_data),
);
Ok((document, cache_control, extensions))
}
async fn execute_once(
&self,
document: ExecutableDocumentData,
extensions: spin::Mutex<Extensions>,
variables: Variables,
ctx_data: Data,
) -> Response {
// execute
let inc_resolve_id = AtomicUsize::default();
let env = QueryEnv::new(extensions, variables, document, Arc::new(ctx_data));
let ctx = ContextBase {
path_node: None,
resolve_id: ResolveId::root(),
@ -434,45 +436,42 @@ where
Response {
data,
extensions,
cache_control,
cache_control: Default::default(),
error: None,
}
}
/// Execute an GraphQL query.
pub async fn execute(&self, request: impl Into<Request>) -> Response {
let request = request.into();
let (document, cache_control, extensions) =
try_query_result!(self.prepare_request(&request));
let mut resp = self
.execute_once(document, extensions, request.variables, request.ctx_data)
.await;
resp.cache_control = cache_control;
resp
}
/// Execute an GraphQL subscription.
pub fn execute_stream(&self, request: impl Into<Request>) -> impl Stream<Item = Response> {
let schema = self.clone();
Box::pin(async_stream::stream! {
async_stream::stream! {
let request = request.into();
let (document, extensions) = match schema.prepare_query(&request.query, &request.variables) {
Ok((document, _, extensions)) => (document, extensions),
let (document, cache_control, extensions) = match schema.prepare_request(& request) {
Ok(res) => res,
Err(err) => {
yield Response::from(err);
return;
}
};
let document = match document.into_data(request.operation_name.as_deref()) {
Some(document) => document,
None => {
let err = if let Some(name) = request.operation_name {
QueryError::UnknownOperationNamed {
name: name.to_string(),
}
.into_error(Pos::default())
} else {
QueryError::MissingOperation.into_error(Pos::default())
};
extensions.lock().error(&err);
yield err.into();
return;
}
};
if document.operation.node.ty != OperationType::Subscription {
let err = QueryError::NotSupported.into_error(Pos::default());
extensions.lock().error(&err);
yield err.into();
let mut resp = schema
.execute_once(document, extensions, request.variables, request.ctx_data)
.await;
resp.cache_control = cache_control;
yield resp;
return;
}
@ -506,6 +505,6 @@ where
break;
}
}
})
}
}
}

View File

@ -1,206 +0,0 @@
use crate::{ObjectType, Result, Schema, SubscriptionType};
use futures::channel::mpsc;
use futures::task::{AtomicWaker, Context, Poll};
use futures::{Stream, StreamExt};
use slab::Slab;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
/// Use to hold all subscription stream for the `SubscriptionConnection`
pub struct SubscriptionStreams {
streams: Slab<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>>,
}
#[allow(missing_docs)]
impl SubscriptionStreams {
pub fn add<S: Stream<Item = Result<serde_json::Value>> + Send + 'static>(
&mut self,
stream: S,
) -> usize {
self.streams.insert(Box::pin(stream))
}
pub fn remove(&mut self, id: usize) {
if self.streams.contains(id) {
self.streams.remove(id);
}
}
}
/// Connection transport
///
/// You can customize your transport by implementing this trait.
#[async_trait::async_trait]
pub trait ConnectionTransport: Send + Sync + Unpin + 'static {
/// The error type.
type Error;
/// Parse the request data here.
/// If you have a new subscribe, create a stream with the `Schema::create_subscription_stream`, and then call `SubscriptionStreams::add`.
/// You can return a `Byte`, which will be sent to the client. If it returns an error, the connection will be broken.
async fn handle_request<Query, Mutation, Subscription>(
&mut self,
schema: &Schema<Query, Mutation, Subscription>,
streams: &mut SubscriptionStreams,
request: Vec<u8>,
send_buf: &mut VecDeque<Vec<u8>>,
) -> std::result::Result<(), Self::Error>
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Sync + Send + 'static;
/// When a response message is generated, you can convert the message to the format you want here.
fn handle_response(&mut self, id: usize, res: Result<serde_json::Value>) -> Option<Vec<u8>>;
}
pub fn create_connection<Query, Mutation, Subscription, T: ConnectionTransport>(
schema: Schema<Query, Mutation, Subscription>,
mut transport: T,
) -> (
mpsc::UnboundedSender<Vec<u8>>,
impl Stream<Item = Vec<u8>> + Unpin,
)
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Sync + Send + 'static,
{
let (tx_bytes, rx_bytes) = mpsc::unbounded();
let stream = async_stream::stream! {
let mut streams = SubscriptionStreams {
streams: Default::default(),
};
let mut send_buf = Default::default();
let mut inner_stream = SubscriptionStream {
schema: &schema,
transport: Some(&mut transport),
streams: Some(&mut streams),
rx_bytes,
handle_request_fut: None,
waker: AtomicWaker::new(),
send_buf: Some(&mut send_buf),
};
while let Some(data) = inner_stream.next().await {
yield data;
}
};
(tx_bytes, Box::pin(stream))
}
type HandleRequestBoxFut<'a, T> = Pin<
Box<
dyn Future<
Output = (
std::result::Result<(), <T as ConnectionTransport>::Error>,
&'a mut T,
&'a mut SubscriptionStreams,
&'a mut VecDeque<Vec<u8>>,
),
> + Send
+ 'a,
>,
>;
#[allow(missing_docs)]
#[allow(clippy::type_complexity)]
struct SubscriptionStream<'a, Query, Mutation, Subscription, T: ConnectionTransport> {
schema: &'a Schema<Query, Mutation, Subscription>,
transport: Option<&'a mut T>,
streams: Option<&'a mut SubscriptionStreams>,
rx_bytes: mpsc::UnboundedReceiver<Vec<u8>>,
handle_request_fut: Option<HandleRequestBoxFut<'a, T>>,
waker: AtomicWaker,
send_buf: Option<&'a mut VecDeque<Vec<u8>>>,
}
impl<'a, Query, Mutation, Subscription, T> Stream
for SubscriptionStream<'a, Query, Mutation, Subscription, T>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
T: ConnectionTransport,
{
type Item = Vec<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
loop {
// receive bytes
if let Some(send_buf) = &mut this.send_buf {
if let Some(bytes) = send_buf.pop_front() {
return Poll::Ready(Some(bytes));
}
}
if let Some(handle_request_fut) = &mut this.handle_request_fut {
match handle_request_fut.as_mut().poll(cx) {
Poll::Ready((Ok(()), transport, streams, send_buf)) => {
this.transport = Some(transport);
this.streams = Some(streams);
this.send_buf = Some(send_buf);
this.handle_request_fut = None;
continue;
}
Poll::Ready((Err(_), _, _, _)) => return Poll::Ready(None),
Poll::Pending => {}
}
} else {
match Pin::new(&mut this.rx_bytes).poll_next(cx) {
Poll::Ready(Some(data)) => {
let transport = this.transport.take().unwrap();
let schema = this.schema;
let streams = this.streams.take().unwrap();
let send_buf = this.send_buf.take().unwrap();
this.handle_request_fut = Some(Box::pin(async move {
let res = transport
.handle_request(schema, streams, data, send_buf)
.await;
(res, transport, streams, send_buf)
}));
this.waker.wake();
continue;
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {}
}
}
// receive msg
if let (Some(streams), Some(transport)) = (&mut this.streams, &mut this.transport) {
if !streams.streams.is_empty() {
let mut closed = Vec::new();
for (id, incoming_stream) in &mut streams.streams {
match incoming_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(res)) => {
if res.is_err() {
closed.push(id);
}
if let Some(bytes) = transport.handle_response(id, res) {
return Poll::Ready(Some(bytes));
}
}
Poll::Ready(None) => {
closed.push(id);
}
Poll::Pending => {}
}
}
closed.iter().for_each(|id| streams.remove(*id));
this.waker.register(cx.waker());
return Poll::Pending;
} else {
this.waker.register(cx.waker());
return Poll::Pending;
}
} else {
return Poll::Pending;
}
}
}
}

View File

@ -6,3 +6,4 @@ mod subscription_type;
//pub use connection::{create_connection, ConnectionTransport, SubscriptionStreams};
pub use simple_broker::SimpleBroker;
pub use subscription_type::{create_subscription_stream, SubscriptionType};
pub mod transports;

View File

@ -0,0 +1 @@
pub mod websocket;

View File

@ -0,0 +1,262 @@
use crate::{http, Data, FieldResult, ObjectType, Response, Schema, SubscriptionType};
use futures::channel::mpsc;
use futures::task::{AtomicWaker, Context, Poll};
use futures::{Future, Stream, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
#[derive(Serialize, Deserialize)]
struct OperationMessage {
#[serde(rename = "type")]
ty: String,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
payload: Option<serde_json::Value>,
}
type SubscriptionStreams = HashMap<String, Pin<Box<dyn Stream<Item = Response> + Send>>>;
type HandleRequestBoxFut<'a> =
Pin<Box<dyn Future<Output = FieldResult<WSContext<'a>>> + Send + 'a>>;
type InitializerFn = Arc<dyn Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync>;
pub fn create<Query, Mutation, Subscription>(
schema: &Schema<Query, Mutation, Subscription>,
) -> (mpsc::UnboundedSender<Vec<u8>>, impl Stream<Item = Vec<u8>>)
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
create_with_initializer(schema, |_| Ok(Default::default()))
}
pub fn create_with_initializer<Query, Mutation, Subscription>(
schema: &Schema<Query, Mutation, Subscription>,
initializer: impl Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync + 'static,
) -> (mpsc::UnboundedSender<Vec<u8>>, impl Stream<Item = Vec<u8>>)
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let schema = schema.clone();
let (tx_bytes, rx_bytes) = mpsc::unbounded();
let stream = async_stream::stream! {
let mut streams = Default::default();
let mut send_buf = Default::default();
let mut data = Arc::new(Data::default());
let mut inner_stream = SubscriptionStream {
schema: &schema,
initializer: Arc::new(initializer),
rx_bytes,
handle_request_fut: None,
waker: AtomicWaker::new(),
ctx: Some(WSContext {
streams: &mut streams,
send_buf: &mut send_buf,
ctx_data: &mut data,
}),
};
while let Some(data) = inner_stream.next().await {
yield data;
}
};
(tx_bytes, stream)
}
struct WSContext<'a> {
streams: &'a mut SubscriptionStreams,
send_buf: &'a mut VecDeque<Vec<u8>>,
ctx_data: &'a mut Arc<Data>,
}
fn send_message<T: Serialize>(send_buf: &mut VecDeque<Vec<u8>>, msg: &T) {
if let Ok(data) = serde_json::to_vec(msg) {
send_buf.push_back(data);
}
}
#[allow(missing_docs)]
#[allow(clippy::type_complexity)]
struct SubscriptionStream<'a, Query, Mutation, Subscription> {
schema: &'a Schema<Query, Mutation, Subscription>,
initializer: InitializerFn,
rx_bytes: mpsc::UnboundedReceiver<Vec<u8>>,
handle_request_fut: Option<HandleRequestBoxFut<'a>>,
waker: AtomicWaker,
ctx: Option<WSContext<'a>>,
}
impl<'a, Query, Mutation, Subscription> Stream
for SubscriptionStream<'a, Query, Mutation, Subscription>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
type Item = Vec<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
loop {
// receive bytes
if let Some(ctx) = &mut this.ctx {
if let Some(bytes) = ctx.send_buf.pop_front() {
return Poll::Ready(Some(bytes));
}
}
if let Some(handle_request_fut) = &mut this.handle_request_fut {
match handle_request_fut.as_mut().poll(cx) {
Poll::Ready(Ok(ctx)) => {
this.ctx = Some(ctx);
this.handle_request_fut = None;
continue;
}
Poll::Ready(Err(_)) => return Poll::Ready(None),
Poll::Pending => {}
}
} else {
match Pin::new(&mut this.rx_bytes).poll_next(cx) {
Poll::Ready(Some(data)) => {
let ctx = this.ctx.take().unwrap();
this.handle_request_fut = Some(Box::pin(handle_request(
this.schema.clone(),
this.initializer.clone(),
ctx,
data,
)));
this.waker.wake();
continue;
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {}
}
}
// receive msg
if let Some(ctx) = &mut this.ctx {
if !ctx.streams.is_empty() {
let mut closed = Vec::new();
for (id, incoming_stream) in ctx.streams.iter_mut() {
loop {
match incoming_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(res)) => {
if let Some(err) = &res.error {
closed.push(id.to_string());
send_message(
ctx.send_buf,
&OperationMessage {
ty: "error".to_string(),
id: Some(id.to_string()),
payload: Some(serde_json::to_value(err).unwrap()),
},
);
} else {
send_message(
ctx.send_buf,
&OperationMessage {
ty: "data".to_string(),
id: Some(id.to_string()),
payload: Some(serde_json::to_value(&res).unwrap()),
},
);
}
}
Poll::Ready(None) => {
closed.push(id.to_string());
send_message(
ctx.send_buf,
&OperationMessage {
ty: "complete".to_string(),
id: Some(id.to_string()),
payload: None,
},
);
}
Poll::Pending => break,
}
}
}
for id in closed {
ctx.streams.remove(&id);
}
this.waker.register(cx.waker());
return Poll::Pending;
} else {
this.waker.register(cx.waker());
return Poll::Pending;
}
} else {
return Poll::Pending;
}
}
}
}
async fn handle_request<'a, Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
initializer: InitializerFn,
ctx: WSContext<'a>,
data: Vec<u8>,
) -> FieldResult<WSContext<'a>>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
match serde_json::from_slice::<OperationMessage>(&data) {
Ok(msg) => match msg.ty.as_str() {
"connection_init" => {
if let Some(payload) = msg.payload {
*ctx.ctx_data = Arc::new(initializer(payload)?);
}
send_message(
ctx.send_buf,
&OperationMessage {
ty: "connection_ack".to_string(),
id: None,
payload: None,
},
);
}
"start" => {
if let (Some(id), Some(payload)) = (msg.id, msg.payload) {
if let Ok(request) = serde_json::from_value::<http::GQLRequest>(payload) {
let stream = schema.execute_stream(request).boxed();
ctx.streams.insert(id, stream);
}
}
}
"stop" => {
if let Some(id) = msg.id {
if ctx.streams.remove(&id).is_some() {
send_message(
ctx.send_buf,
&OperationMessage {
ty: "complete".to_string(),
id: Some(id),
payload: None,
},
);
}
}
}
"connection_terminate" => return Err("connection_terminate".into()),
_ => return Err("Unknown op".into()),
},
Err(err) => return Err(err.into()),
}
Ok(ctx)
}

View File

@ -1,227 +0,0 @@
use crate::context::Data;
use crate::http::GQLRequest;
use crate::{
ConnectionTransport, Error, FieldError, FieldResult, GQLQueryResponse, ObjectType, QueryError,
Result, Schema, SubscriptionStreams, SubscriptionType, Variables,
};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
#[derive(Serialize, Deserialize)]
struct OperationMessage {
#[serde(rename = "type")]
ty: String,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
payload: Option<serde_json::Value>,
}
/// WebSocket transport for subscription
#[derive(Default)]
pub struct WebSocketTransport {
id_to_sid: HashMap<String, usize>,
sid_to_id: HashMap<usize, String>,
data: Arc<Data>,
init_context_data: Option<Box<dyn Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync>>,
}
impl WebSocketTransport {
/// Creates a websocket transport and sets the function that converts the `payload` of the `connect_init` message to `Data`.
pub fn new<F: Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync + 'static>(
init_context_data: F,
) -> Self {
WebSocketTransport {
init_context_data: Some(Box::new(init_context_data)),
..WebSocketTransport::default()
}
}
}
fn send_message<T: Serialize>(send_buf: &mut VecDeque<Vec<u8>>, msg: &T) {
if let Ok(data) = serde_json::to_vec(msg) {
send_buf.push_back(data);
}
}
#[async_trait::async_trait]
impl ConnectionTransport for WebSocketTransport {
type Error = FieldError;
async fn handle_request<Query, Mutation, Subscription>(
&mut self,
schema: &Schema<Query, Mutation, Subscription>,
streams: &mut SubscriptionStreams,
request: Vec<u8>,
send_buf: &mut VecDeque<Vec<u8>>,
) -> std::result::Result<(), Self::Error>
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Sync + Send + 'static,
{
match serde_json::from_slice::<OperationMessage>(&request) {
Ok(msg) => match msg.ty.as_str() {
"connection_init" => {
if let Some(payload) = msg.payload {
if let Some(init_context_data) = &self.init_context_data {
self.data = Arc::new(init_context_data(payload)?);
}
}
send_message(
send_buf,
&OperationMessage {
ty: "connection_ack".to_string(),
id: None,
payload: None,
},
);
Ok(())
}
"start" => {
if let (Some(id), Some(payload)) = (msg.id, msg.payload) {
if let Ok(request) = serde_json::from_value::<GQLRequest>(payload) {
let variables = request
.variables
.map(Variables::parse_from_json)
.unwrap_or_default();
match schema
.create_subscription_stream(
&request.query,
request.operation_name.as_deref(),
variables.clone(),
Some(self.data.clone()),
)
.await
{
Ok(stream) => {
let stream_id = streams.add(stream);
self.id_to_sid.insert(id.clone(), stream_id);
self.sid_to_id.insert(stream_id, id);
}
Err(Error::Query { err, .. })
if err == QueryError::NotSupported =>
{
// Is query or mutation
let mut builder =
QueryBuilder::new(&request.query).variables(variables);
if let Some(operation_name) = &request.operation_name {
builder = builder.operation_name(operation_name);
}
match builder.execute(schema).await {
Ok(resp) => {
send_message(
send_buf,
&OperationMessage {
ty: "data".to_string(),
id: Some(id.clone()),
payload: Some(
serde_json::to_value(&GQLResponse(Ok(
resp,
)))
.unwrap(),
),
},
);
send_message(
send_buf,
&OperationMessage {
ty: "complete".to_string(),
id: Some(id),
payload: None,
},
);
}
Err(err) => {
send_message(
send_buf,
&OperationMessage {
ty: "error".to_string(),
id: Some(id),
payload: Some(
serde_json::to_value(GQLError(&err))
.unwrap(),
),
},
);
}
}
}
Err(err) => {
send_message(
send_buf,
&OperationMessage {
ty: "error".to_string(),
id: Some(id),
payload: Some(
serde_json::to_value(GQLError(&err)).unwrap(),
),
},
);
}
}
}
}
Ok(())
}
"stop" => {
if let Some(id) = msg.id {
if let Some(sid) = self.id_to_sid.remove(&id) {
self.sid_to_id.remove(&sid);
streams.remove(sid);
send_message(
send_buf,
&OperationMessage {
ty: "complete".to_string(),
id: Some(id),
payload: None,
},
);
}
}
Ok(())
}
"connection_terminate" => Err("connection_terminate".into()),
_ => Err("Unknown op".into()),
},
Err(err) => Err(err.into()),
}
}
fn handle_response(&mut self, id: usize, res: Result<serde_json::Value>) -> Option<Vec<u8>> {
if let Some(id) = self.sid_to_id.get(&id) {
match res {
Ok(value) => Some(
serde_json::to_vec(&OperationMessage {
ty: "data".to_string(),
id: Some(id.clone()),
payload: Some(
serde_json::to_value(GQLResponse(Ok(GQLQueryResponse {
data: value,
extensions: None,
cache_control: Default::default(),
})))
.unwrap(),
),
})
.unwrap(),
),
Err(err) => Some(
serde_json::to_vec(&OperationMessage {
ty: "error".to_string(),
id: Some(id.to_string()),
payload: Some(serde_json::to_value(GQLError(&err)).unwrap()),
})
.unwrap(),
),
}
} else {
None
}
}
}

View File

@ -32,7 +32,8 @@ pub async fn test_subscription() {
{
let mut stream = schema
.execute_stream("subscription { values(start: 10, end: 20) }")
.map(|resp| resp.into_result().unwrap().data);
.map(|resp| resp.into_result().unwrap().data)
.boxed();
for i in 10..20 {
assert_eq!(
Some(serde_json::json!({ "values": i })),
@ -45,7 +46,8 @@ pub async fn test_subscription() {
{
let mut stream = schema
.execute_stream("subscription { events(start: 10, end: 20) { a b } }")
.map(|resp| resp.into_result().unwrap().data);
.map(|resp| resp.into_result().unwrap().data)
.boxed();
for i in 10..20 {
assert_eq!(
Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })),
@ -97,10 +99,12 @@ pub async fn test_simple_broker() {
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
let mut stream1 = schema
.execute_stream("subscription { events1 { value } }")
.map(|resp| resp.into_result().unwrap().data);
.map(|resp| resp.into_result().unwrap().data)
.boxed();
let mut stream2 = schema
.execute_stream("subscription { events2 { value } }")
.map(|resp| resp.into_result().unwrap().data);
.map(|resp| resp.into_result().unwrap().data)
.boxed();
assert_eq!(
stream1.next().await,
@ -156,7 +160,8 @@ pub async fn test_subscription_with_ctx_data() {
{
let mut stream = schema
.execute_stream(Request::new("subscription { values objects { value } }").data(100i32))
.map(|resp| resp.data);
.map(|resp| resp.data)
.boxed();
assert_eq!(
Some(serde_json::json!({ "values": 100 })),
stream.next().await
@ -197,7 +202,8 @@ pub async fn test_subscription_with_token() {
.execute_stream(
Request::new("subscription { values }").data(Token("123456".to_string())),
)
.map(|resp| resp.into_result().unwrap().data);
.map(|resp| resp.into_result().unwrap().data)
.boxed();
assert_eq!(
Some(serde_json::json!({ "values": 100 })),
stream.next().await
@ -210,6 +216,7 @@ pub async fn test_subscription_with_token() {
.execute_stream(
Request::new("subscription { values }").data(Token("654321".to_string()))
)
.boxed()
.next()
.await
.unwrap()
@ -253,7 +260,8 @@ pub async fn test_subscription_inline_fragment() {
}
"#,
)
.map(|resp| resp.data);
.map(|resp| resp.data)
.boxed();
for i in 10..20 {
assert_eq!(
Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })),
@ -306,8 +314,9 @@ pub async fn test_subscription_fragment() {
}
"#,
)
.map(|resp| resp.data);
for i in 10..20 {
.map(|resp| resp.data)
.boxed();
for i in 10i32..20 {
assert_eq!(
Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })),
stream.next().await
@ -360,7 +369,8 @@ pub async fn test_subscription_fragment2() {
}
"#,
)
.map(|resp| resp.data);
.map(|resp| resp.data)
.boxed();
for i in 10..20 {
assert_eq!(
Some(serde_json::json!({ "events": {"a": i, "b": i * 10} })),
@ -405,7 +415,8 @@ pub async fn test_subscription_error() {
let mut stream = schema
.execute_stream("subscription { events { value } }")
.map(|resp| resp.into_result())
.map_ok(|resp| resp.data);
.map_ok(|resp| resp.data)
.boxed();
for i in 0i32..5 {
assert_eq!(
Some(Ok(serde_json::json!({ "events": { "value": i } }))),
@ -454,7 +465,8 @@ pub async fn test_subscription_fieldresult() {
let mut stream = schema
.execute_stream("subscription { values }")
.map(|resp| resp.into_result())
.map_ok(|resp| resp.data);
.map_ok(|resp| resp.data)
.boxed();
for i in 0i32..5 {
assert_eq!(
Some(Ok(serde_json::json!({ "values": i }))),