Rework websocket

a
This commit is contained in:
Sunli 2020-09-15 11:44:48 +08:00
parent 059128e9c5
commit ba03ccf3ea

View File

@ -24,8 +24,7 @@ struct OperationMessage<'a, T> {
type SubscriptionStreams = HashMap<String, Pin<Box<dyn Stream<Item = Response> + Send>>>;
type HandleRequestBoxFut<'a> =
Pin<Box<dyn Future<Output = FieldResult<WSContext<'a>>> + Send + 'a>>;
type HandleRequestBoxFut = Pin<Box<dyn Future<Output = FieldResult<WSContext>> + Send>>;
type InitializerFn = Arc<dyn Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync>;
@ -88,37 +87,28 @@ where
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let schema = schema.clone();
let (tx, rx) = mpsc::unbounded();
let stream = async_stream::stream! {
let mut streams = Default::default();
let mut send_buf = Default::default();
let mut data = Arc::new(Data::default());
let mut inner_stream = SubscriptionStream {
schema: &schema,
WebSocketStream {
tx,
rx: SubscriptionStream {
schema: schema.clone(),
initializer: Arc::new(initializer),
rx_bytes: rx,
handle_request_fut: None,
ctx: Some(WSContext {
streams: &mut streams,
send_buf: &mut send_buf,
ctx_data: &mut data,
streams: Default::default(),
send_buf: Default::default(),
ctx_data: Arc::new(Data::default()),
}),
};
while let Some(data) = inner_stream.next().await {
yield data;
}
};
WebSocketStream {
tx,
rx: Box::pin(stream),
.boxed(),
}
}
struct WSContext<'a> {
streams: &'a mut SubscriptionStreams,
send_buf: &'a mut VecDeque<String>,
ctx_data: &'a mut Arc<Data>,
struct WSContext {
streams: SubscriptionStreams,
send_buf: VecDeque<String>,
ctx_data: Arc<Data>,
}
fn send_message<T: Serialize>(send_buf: &mut VecDeque<String>, msg: &T) {
@ -129,16 +119,15 @@ fn send_message<T: Serialize>(send_buf: &mut VecDeque<String>, msg: &T) {
#[allow(missing_docs)]
#[allow(clippy::type_complexity)]
struct SubscriptionStream<'a, Query, Mutation, Subscription> {
schema: &'a Schema<Query, Mutation, Subscription>,
struct SubscriptionStream<Query, Mutation, Subscription> {
schema: Schema<Query, Mutation, Subscription>,
initializer: InitializerFn,
rx_bytes: mpsc::UnboundedReceiver<String>,
handle_request_fut: Option<HandleRequestBoxFut<'a>>,
ctx: Option<WSContext<'a>>,
handle_request_fut: Option<HandleRequestBoxFut>,
ctx: Option<WSContext>,
}
impl<'a, Query, Mutation, Subscription> Stream
for SubscriptionStream<'a, Query, Mutation, Subscription>
impl<'a, Query, Mutation, Subscription> Stream for SubscriptionStream<Query, Mutation, Subscription>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
@ -194,7 +183,7 @@ where
if let Some(err) = &res.error {
closed.push(id.to_string());
send_message(
ctx.send_buf,
&mut ctx.send_buf,
&OperationMessage {
ty: "error",
id: Some(id.to_string()),
@ -203,7 +192,7 @@ where
);
} else {
send_message(
ctx.send_buf,
&mut ctx.send_buf,
&OperationMessage {
ty: "data",
id: Some(id.to_string()),
@ -215,7 +204,7 @@ where
Poll::Ready(None) => {
closed.push(id.to_string());
send_message(
ctx.send_buf,
&mut ctx.send_buf,
&OperationMessage {
ty: "complete",
id: Some(id.to_string()),
@ -241,12 +230,12 @@ where
}
}
async fn handle_request<'a, Query, Mutation, Subscription>(
async fn handle_request<Query, Mutation, Subscription>(
schema: Schema<Query, Mutation, Subscription>,
initializer: InitializerFn,
ctx: WSContext<'a>,
mut ctx: WSContext,
data: String,
) -> FieldResult<WSContext<'a>>
) -> FieldResult<WSContext>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
@ -256,10 +245,10 @@ where
Ok(msg) => match msg.ty {
"connection_init" => {
if let Some(payload) = msg.payload {
*ctx.ctx_data = Arc::new(initializer(payload)?);
ctx.ctx_data = Arc::new(initializer(payload)?);
}
send_message(
ctx.send_buf,
&mut ctx.send_buf,
&OperationMessage {
ty: "connection_ack",
id: None,
@ -281,7 +270,7 @@ where
if let Some(id) = msg.id {
if ctx.streams.remove(&id).is_some() {
send_message(
ctx.send_buf,
&mut ctx.send_buf,
&OperationMessage {
ty: "complete",
id: Some(id),