Fix a little bug.
This commit is contained in:
parent
3c01c5ed28
commit
857afb1f54
|
@ -145,56 +145,49 @@ where
|
|||
|
||||
// receive msg
|
||||
if let Some(ctx) = &mut this.ctx {
|
||||
if !ctx.streams.is_empty() {
|
||||
let mut closed = Vec::new();
|
||||
let mut closed = Vec::new();
|
||||
|
||||
for (id, incoming_stream) in ctx.streams.iter_mut() {
|
||||
loop {
|
||||
match incoming_stream.as_mut().poll_next(cx) {
|
||||
Poll::Ready(Some(res)) => {
|
||||
if let Some(err) = &res.error {
|
||||
closed.push(id.to_string());
|
||||
send_message(
|
||||
ctx.send_buf,
|
||||
&OperationMessage {
|
||||
ty: "error".to_string(),
|
||||
id: Some(id.to_string()),
|
||||
payload: Some(serde_json::to_value(err).unwrap()),
|
||||
},
|
||||
);
|
||||
} else {
|
||||
send_message(
|
||||
ctx.send_buf,
|
||||
&OperationMessage {
|
||||
ty: "data".to_string(),
|
||||
id: Some(id.to_string()),
|
||||
payload: Some(serde_json::to_value(&res).unwrap()),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
closed.push(id.to_string());
|
||||
send_message(
|
||||
ctx.send_buf,
|
||||
&OperationMessage {
|
||||
ty: "complete".to_string(),
|
||||
id: Some(id.to_string()),
|
||||
payload: None,
|
||||
},
|
||||
);
|
||||
break;
|
||||
}
|
||||
Poll::Pending => {
|
||||
break;
|
||||
}
|
||||
for (id, incoming_stream) in ctx.streams.iter_mut() {
|
||||
match incoming_stream.as_mut().poll_next(cx) {
|
||||
Poll::Ready(Some(res)) => {
|
||||
if let Some(err) = &res.error {
|
||||
closed.push(id.to_string());
|
||||
send_message(
|
||||
ctx.send_buf,
|
||||
&OperationMessage {
|
||||
ty: "error".to_string(),
|
||||
id: Some(id.to_string()),
|
||||
payload: Some(serde_json::to_value(err).unwrap()),
|
||||
},
|
||||
);
|
||||
} else {
|
||||
send_message(
|
||||
ctx.send_buf,
|
||||
&OperationMessage {
|
||||
ty: "data".to_string(),
|
||||
id: Some(id.to_string()),
|
||||
payload: Some(serde_json::to_value(&res).unwrap()),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
closed.push(id.to_string());
|
||||
send_message(
|
||||
ctx.send_buf,
|
||||
&OperationMessage {
|
||||
ty: "complete".to_string(),
|
||||
id: Some(id.to_string()),
|
||||
payload: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
Poll::Pending => {}
|
||||
}
|
||||
}
|
||||
|
||||
for id in closed {
|
||||
ctx.streams.remove(&id);
|
||||
}
|
||||
for id in closed {
|
||||
ctx.streams.remove(&id);
|
||||
}
|
||||
|
||||
if !ctx.send_buf.is_empty() {
|
||||
|
|
Loading…
Reference in New Issue
Block a user