Fix internal context missing when there's no connection_init frame. #451

This commit is contained in:
Sunli 2021-03-24 17:24:12 +08:00
parent d280978ccb
commit e991ffb814
3 changed files with 82 additions and 10 deletions

View File

@ -61,7 +61,7 @@ pin_project! {
pub struct WebSocket<S, F, Query, Mutation, Subscription> {
data_initializer: Option<F>,
init_fut: Option<BoxFuture<'static, Result<Data>>>,
data: Arc<Data>,
data: Option<Arc<Data>>,
schema: Schema<Query, Mutation, Subscription>,
streams: HashMap<String, Pin<Box<dyn Stream<Item = Response> + Send>>>,
#[pin]
@ -105,7 +105,7 @@ impl<S, F, Query, Mutation, Subscription> WebSocket<S, F, Query, Mutation, Subsc
Self {
data_initializer: Some(data_initializer),
init_fut: None,
data: Arc::default(),
data: None,
schema,
streams: HashMap::new(),
stream,
@ -174,13 +174,17 @@ where
id,
payload: request,
} => {
this.streams.insert(
id,
Box::pin(
this.schema
.execute_stream_with_ctx_data(request, Arc::clone(this.data)),
),
);
if let Some(data) = this.data.clone() {
this.streams.insert(
id,
Box::pin(this.schema.execute_stream_with_ctx_data(request, data)),
);
} else {
return Poll::Ready(Some(WsMessage::Close(
1011,
"The handshake is not completed.".to_string(),
)));
}
}
ClientMessage::Stop { id } => {
if this.streams.remove(id).is_some() {
@ -202,7 +206,7 @@ where
*this.init_fut = None;
return match res {
Ok(data) => {
*this.data = Arc::new(data);
*this.data = Some(Arc::new(data));
Poll::Ready(Some(WsMessage::Text(
serde_json::to_string(&ServerMessage::ConnectionAck).unwrap(),
)))

View File

@ -430,3 +430,37 @@ pub async fn test_query_over_websocket() {
serde_json::from_str(&stream.next().await.unwrap().unwrap_text()).unwrap()
);
}
#[tokio::test]
pub async fn test_start_before_connection_init() {
struct QueryRoot;
#[Object]
impl QueryRoot {
async fn value(&self) -> i32 {
999
}
}
let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
let (mut tx, rx) = mpsc::unbounded();
let mut stream = http::WebSocket::new(schema, rx, WebSocketProtocols::GraphQLWS);
tx.send(
serde_json::to_string(&value!({
"type": "start",
"id": "1",
"payload": {
"query": "query { value }"
},
}))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
stream.next().await.unwrap().unwrap_close(),
(1011, "The handshake is not completed.".to_string())
);
}

View File

@ -391,3 +391,37 @@ pub async fn test_query_over_websocket() {
serde_json::from_str(&stream.next().await.unwrap().unwrap_text()).unwrap()
);
}
#[tokio::test]
pub async fn test_start_before_connection_init() {
struct QueryRoot;
#[Object]
impl QueryRoot {
async fn value(&self) -> i32 {
999
}
}
let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription);
let (mut tx, rx) = mpsc::unbounded();
let mut stream = http::WebSocket::new(schema, rx, WebSocketProtocols::SubscriptionsTransportWS);
tx.send(
serde_json::to_string(&value!({
"type": "start",
"id": "1",
"payload": {
"query": "query { value }"
},
}))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
stream.next().await.unwrap().unwrap_close(),
(1011, "The handshake is not completed.".to_string())
);
}