This commit is contained in:
Sunli 2021-05-08 14:06:46 +08:00
parent 588bba549f
commit 63738e9a3a
3 changed files with 25 additions and 29 deletions

View File

@ -6,7 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[Unreleased]
- If `InputObject` contains an unnamed field, the correct error message will be given. [#498](https://github.com/async-graphql/async-graphql/issues/498)
- If `InputObject` contains an unnamed field, the correct error message will be given. [#498](https://github.com/async-graphql/async-graphql/issues/498)
- Added Websocket::with_message_stream for client message customization. [#501](https://github.com/async-graphql/async-graphql/pull/501)
## [2.8.4] 2021-04-23

View File

@ -12,12 +12,7 @@ use crate::{BatchRequest, ParseRequestError, Request};
pub use graphiql_source::graphiql_source;
pub use multipart::MultipartOptions;
pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
pub use websocket::{
Protocols as WebSocketProtocols,
WebSocket,
WsMessage,
ClientMessage,
};
pub use websocket::{ClientMessage, Protocols as WebSocketProtocols, WebSocket, WsMessage};
/// Receive a GraphQL request from a content type and body.
pub async fn receive_body(

View File

@ -1,14 +1,17 @@
//! WebSocket transport for subscription
use std::{collections::HashMap};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_util::{StreamExt, future::{BoxFuture, Ready}};
use futures_util::stream::Stream;
use futures_util::FutureExt;
use futures_util::{
future::{BoxFuture, Ready},
StreamExt,
};
use pin_project_lite::pin_project;
use serde::{Deserialize, Serialize};
@ -74,7 +77,13 @@ type MessageMapStream<S> =
futures_util::stream::Map<S, fn(<S as Stream>::Item) -> serde_json::Result<ClientMessage>>;
impl<S, Query, Mutation, Subscription>
WebSocket<MessageMapStream<S>, fn(serde_json::Value) -> Ready<Result<Data>>, Query, Mutation, Subscription>
WebSocket<
MessageMapStream<S>,
fn(serde_json::Value) -> Ready<Result<Data>>,
Query,
Mutation,
Subscription,
>
where
S: Stream,
S::Item: AsRef<[u8]>,
@ -112,24 +121,16 @@ where
stream: S,
data_initializer: F,
protocol: Protocols,
) -> Self
{
) -> Self {
// let stream = stream.map(|message| serde_json::from_slice(message.as_ref()));
let stream = stream.map(
ClientMessage::from_bytes as fn(S::Item) -> serde_json::Result<ClientMessage>,
);
let stream = stream
.map(ClientMessage::from_bytes as fn(S::Item) -> serde_json::Result<ClientMessage>);
Self::with_message_stream(
schema,
stream,
data_initializer,
protocol,
)
Self::with_message_stream(schema, stream, data_initializer, protocol)
}
}
impl<S, F, Query, Mutation, Subscription>
WebSocket<S, F, Query, Mutation, Subscription>
impl<S, F, Query, Mutation, Subscription> WebSocket<S, F, Query, Mutation, Subscription>
where
S: Stream<Item = serde_json::Result<ClientMessage>>,
{
@ -145,8 +146,7 @@ where
stream: S,
data_initializer: F,
protocol: Protocols,
) -> Self
{
) -> Self {
WebSocket {
data_initializer: Some(data_initializer),
init_fut: None,
@ -183,9 +183,7 @@ where
let message: ClientMessage = match message {
Ok(message) => message,
Err(err) => {
return Poll::Ready(Some(WsMessage::Close(1002, err.to_string())))
}
Err(err) => return Poll::Ready(Some(WsMessage::Close(1002, err.to_string()))),
};
match message {
@ -237,7 +235,8 @@ where
ClientMessage::Stop { id } => {
if this.streams.remove(&id).is_some() {
return Poll::Ready(Some(WsMessage::Text(
serde_json::to_string(&ServerMessage::Complete { id: &id }).unwrap(),
serde_json::to_string(&ServerMessage::Complete { id: &id })
.unwrap(),
)));
}
}