diff --git a/src/http/websocket.rs b/src/http/websocket.rs index 7c556e00..6c370940 100644 --- a/src/http/websocket.rs +++ b/src/http/websocket.rs @@ -24,8 +24,7 @@ struct OperationMessage<'a, T> { type SubscriptionStreams = HashMap + Send>>>; -type HandleRequestBoxFut<'a> = - Pin>> + Send + 'a>>; +type HandleRequestBoxFut = Pin> + Send>>; type InitializerFn = Arc FieldResult + Send + Sync>; @@ -88,37 +87,28 @@ where Mutation: ObjectType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static, { - let schema = schema.clone(); let (tx, rx) = mpsc::unbounded(); - let stream = async_stream::stream! { - let mut streams = Default::default(); - let mut send_buf = Default::default(); - let mut data = Arc::new(Data::default()); - let mut inner_stream = SubscriptionStream { - schema: &schema, + WebSocketStream { + tx, + rx: SubscriptionStream { + schema: schema.clone(), initializer: Arc::new(initializer), rx_bytes: rx, handle_request_fut: None, ctx: Some(WSContext { - streams: &mut streams, - send_buf: &mut send_buf, - ctx_data: &mut data, + streams: Default::default(), + send_buf: Default::default(), + ctx_data: Arc::new(Data::default()), }), - }; - while let Some(data) = inner_stream.next().await { - yield data; } - }; - WebSocketStream { - tx, - rx: Box::pin(stream), + .boxed(), } } -struct WSContext<'a> { - streams: &'a mut SubscriptionStreams, - send_buf: &'a mut VecDeque, - ctx_data: &'a mut Arc, +struct WSContext { + streams: SubscriptionStreams, + send_buf: VecDeque, + ctx_data: Arc, } fn send_message(send_buf: &mut VecDeque, msg: &T) { @@ -129,16 +119,15 @@ fn send_message(send_buf: &mut VecDeque, msg: &T) { #[allow(missing_docs)] #[allow(clippy::type_complexity)] -struct SubscriptionStream<'a, Query, Mutation, Subscription> { - schema: &'a Schema, +struct SubscriptionStream { + schema: Schema, initializer: InitializerFn, rx_bytes: mpsc::UnboundedReceiver, - handle_request_fut: Option>, - ctx: Option>, + handle_request_fut: Option, + ctx: Option, } -impl<'a, Query, Mutation, Subscription> Stream - for SubscriptionStream<'a, Query, Mutation, Subscription> +impl<'a, Query, Mutation, Subscription> Stream for SubscriptionStream where Query: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static, @@ -194,7 +183,7 @@ where if let Some(err) = &res.error { closed.push(id.to_string()); send_message( - ctx.send_buf, + &mut ctx.send_buf, &OperationMessage { ty: "error", id: Some(id.to_string()), @@ -203,7 +192,7 @@ where ); } else { send_message( - ctx.send_buf, + &mut ctx.send_buf, &OperationMessage { ty: "data", id: Some(id.to_string()), @@ -215,7 +204,7 @@ where Poll::Ready(None) => { closed.push(id.to_string()); send_message( - ctx.send_buf, + &mut ctx.send_buf, &OperationMessage { ty: "complete", id: Some(id.to_string()), @@ -241,12 +230,12 @@ where } } -async fn handle_request<'a, Query, Mutation, Subscription>( +async fn handle_request( schema: Schema, initializer: InitializerFn, - ctx: WSContext<'a>, + mut ctx: WSContext, data: String, -) -> FieldResult> +) -> FieldResult where Query: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static, @@ -256,10 +245,10 @@ where Ok(msg) => match msg.ty { "connection_init" => { if let Some(payload) = msg.payload { - *ctx.ctx_data = Arc::new(initializer(payload)?); + ctx.ctx_data = Arc::new(initializer(payload)?); } send_message( - ctx.send_buf, + &mut ctx.send_buf, &OperationMessage { ty: "connection_ack", id: None, @@ -281,7 +270,7 @@ where if let Some(id) = msg.id { if ctx.streams.remove(&id).is_some() { send_message( - ctx.send_buf, + &mut ctx.send_buf, &OperationMessage { ty: "complete", id: Some(id),