Remove unnecessary memory allocation.

This commit is contained in:
Sunli 2020-09-02 14:27:04 +08:00
parent dcfdcb0cc5
commit 2aca257136
5 changed files with 24 additions and 30 deletions

View File

@ -3,7 +3,6 @@ use actix::{
}; };
use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext}; use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext};
use async_graphql::{Data, FieldResult, ObjectType, Schema, SubscriptionType, WebSocketTransport}; use async_graphql::{Data, FieldResult, ObjectType, Schema, SubscriptionType, WebSocketTransport};
use bytes::Bytes;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -15,7 +14,7 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
pub struct WSSubscription<Query, Mutation, Subscription> { pub struct WSSubscription<Query, Mutation, Subscription> {
schema: Schema<Query, Mutation, Subscription>, schema: Schema<Query, Mutation, Subscription>,
hb: Instant, hb: Instant,
sink: Option<mpsc::UnboundedSender<Bytes>>, sink: Option<mpsc::UnboundedSender<Vec<u8>>>,
init_context_data: Option<Box<dyn Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync>>, init_context_data: Option<Box<dyn Fn(serde_json::Value) -> FieldResult<Data> + Send + Sync>>,
} }
@ -119,15 +118,15 @@ where
} }
} }
impl<Query, Mutation, Subscription> StreamHandler<Bytes> impl<Query, Mutation, Subscription> StreamHandler<Vec<u8>>
for WSSubscription<Query, Mutation, Subscription> for WSSubscription<Query, Mutation, Subscription>
where where
Query: ObjectType + Send + Sync + 'static, Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static,
{ {
fn handle(&mut self, data: Bytes, ctx: &mut Self::Context) { fn handle(&mut self, data: Vec<u8>, ctx: &mut Self::Context) {
if let Ok(text) = std::str::from_utf8(&data) { if let Ok(text) = String::from_utf8(data) {
ctx.text(text); ctx.text(text);
} }
} }

View File

@ -201,7 +201,7 @@ where
select! { select! {
bytes = srx.next() => { bytes = srx.next() => {
if let Some(bytes) = bytes { if let Some(bytes) = bytes {
if let Ok(text) = String::from_utf8(bytes.to_vec()) { if let Ok(text) = String::from_utf8(bytes) {
if tx.send(Message::text(text)).await.is_err() { if tx.send(Message::text(text)).await.is_err() {
return; return;
} }
@ -213,7 +213,7 @@ where
msg = rx.next() => { msg = rx.next() => {
if let Some(Ok(msg)) = msg { if let Some(Ok(msg)) = msg {
if msg.is_text() { if msg.is_text() {
if stx.send(msg.into_bytes().into()).await.is_err() { if stx.send(msg.into_bytes()).await.is_err() {
return; return;
} }
} }

View File

@ -12,7 +12,6 @@ use crate::{
SubscriptionType, Type, Variables, ID, SubscriptionType, Type, Variables, ID,
}; };
use async_graphql_parser::query::{Document, OperationType}; use async_graphql_parser::query::{Document, OperationType};
use bytes::Bytes;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::Stream; use futures::Stream;
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
@ -414,8 +413,8 @@ where
&self, &self,
transport: T, transport: T,
) -> ( ) -> (
mpsc::UnboundedSender<Bytes>, mpsc::UnboundedSender<Vec<u8>>,
impl Stream<Item = Bytes> + Unpin, impl Stream<Item = Vec<u8>> + Unpin,
) { ) {
create_connection(self.clone(), transport) create_connection(self.clone(), transport)
} }

View File

@ -1,5 +1,4 @@
use crate::{ObjectType, Result, Schema, SubscriptionType}; use crate::{ObjectType, Result, Schema, SubscriptionType};
use bytes::Bytes;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::task::{AtomicWaker, Context, Poll}; use futures::task::{AtomicWaker, Context, Poll};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
@ -44,8 +43,8 @@ pub trait ConnectionTransport: Send + Sync + Unpin + 'static {
&mut self, &mut self,
schema: &Schema<Query, Mutation, Subscription>, schema: &Schema<Query, Mutation, Subscription>,
streams: &mut SubscriptionStreams, streams: &mut SubscriptionStreams,
request: Bytes, request: Vec<u8>,
send_buf: &mut VecDeque<Bytes>, send_buf: &mut VecDeque<Vec<u8>>,
) -> std::result::Result<(), Self::Error> ) -> std::result::Result<(), Self::Error>
where where
Query: ObjectType + Sync + Send + 'static, Query: ObjectType + Sync + Send + 'static,
@ -53,15 +52,15 @@ pub trait ConnectionTransport: Send + Sync + Unpin + 'static {
Subscription: SubscriptionType + Sync + Send + 'static; Subscription: SubscriptionType + Sync + Send + 'static;
/// When a response message is generated, you can convert the message to the format you want here. /// When a response message is generated, you can convert the message to the format you want here.
fn handle_response(&mut self, id: usize, res: Result<serde_json::Value>) -> Option<Bytes>; fn handle_response(&mut self, id: usize, res: Result<serde_json::Value>) -> Option<Vec<u8>>;
} }
pub fn create_connection<Query, Mutation, Subscription, T: ConnectionTransport>( pub fn create_connection<Query, Mutation, Subscription, T: ConnectionTransport>(
schema: Schema<Query, Mutation, Subscription>, schema: Schema<Query, Mutation, Subscription>,
mut transport: T, mut transport: T,
) -> ( ) -> (
mpsc::UnboundedSender<Bytes>, mpsc::UnboundedSender<Vec<u8>>,
impl Stream<Item = Bytes> + Unpin, impl Stream<Item = Vec<u8>> + Unpin,
) )
where where
Query: ObjectType + Sync + Send + 'static, Query: ObjectType + Sync + Send + 'static,
@ -97,7 +96,7 @@ type HandleRequestBoxFut<'a, T> = Pin<
std::result::Result<(), <T as ConnectionTransport>::Error>, std::result::Result<(), <T as ConnectionTransport>::Error>,
&'a mut T, &'a mut T,
&'a mut SubscriptionStreams, &'a mut SubscriptionStreams,
&'a mut VecDeque<Bytes>, &'a mut VecDeque<Vec<u8>>,
), ),
> + Send > + Send
+ 'a, + 'a,
@ -110,10 +109,10 @@ struct SubscriptionStream<'a, Query, Mutation, Subscription, T: ConnectionTransp
schema: &'a Schema<Query, Mutation, Subscription>, schema: &'a Schema<Query, Mutation, Subscription>,
transport: Option<&'a mut T>, transport: Option<&'a mut T>,
streams: Option<&'a mut SubscriptionStreams>, streams: Option<&'a mut SubscriptionStreams>,
rx_bytes: mpsc::UnboundedReceiver<Bytes>, rx_bytes: mpsc::UnboundedReceiver<Vec<u8>>,
handle_request_fut: Option<HandleRequestBoxFut<'a, T>>, handle_request_fut: Option<HandleRequestBoxFut<'a, T>>,
waker: AtomicWaker, waker: AtomicWaker,
send_buf: Option<&'a mut VecDeque<Bytes>>, send_buf: Option<&'a mut VecDeque<Vec<u8>>>,
} }
impl<'a, Query, Mutation, Subscription, T> Stream impl<'a, Query, Mutation, Subscription, T> Stream
@ -124,7 +123,7 @@ where
Subscription: SubscriptionType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static,
T: ConnectionTransport, T: ConnectionTransport,
{ {
type Item = Bytes; type Item = Vec<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self; let this = &mut *self;

View File

@ -4,7 +4,6 @@ use crate::{
ConnectionTransport, Error, FieldError, FieldResult, ObjectType, QueryBuilder, QueryError, ConnectionTransport, Error, FieldError, FieldResult, ObjectType, QueryBuilder, QueryError,
QueryResponse, Result, Schema, SubscriptionStreams, SubscriptionType, Variables, QueryResponse, Result, Schema, SubscriptionStreams, SubscriptionType, Variables,
}; };
use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::sync::Arc; use std::sync::Arc;
@ -42,9 +41,9 @@ impl WebSocketTransport {
} }
} }
fn send_message<T: Serialize>(send_buf: &mut VecDeque<Bytes>, msg: &T) { fn send_message<T: Serialize>(send_buf: &mut VecDeque<Vec<u8>>, msg: &T) {
if let Ok(data) = serde_json::to_vec(msg) { if let Ok(data) = serde_json::to_vec(msg) {
send_buf.push_back(data.into()); send_buf.push_back(data);
} }
} }
@ -56,8 +55,8 @@ impl ConnectionTransport for WebSocketTransport {
&mut self, &mut self,
schema: &Schema<Query, Mutation, Subscription>, schema: &Schema<Query, Mutation, Subscription>,
streams: &mut SubscriptionStreams, streams: &mut SubscriptionStreams,
request: Bytes, request: Vec<u8>,
send_buf: &mut VecDeque<Bytes>, send_buf: &mut VecDeque<Vec<u8>>,
) -> std::result::Result<(), Self::Error> ) -> std::result::Result<(), Self::Error>
where where
Query: ObjectType + Sync + Send + 'static, Query: ObjectType + Sync + Send + 'static,
@ -195,7 +194,7 @@ impl ConnectionTransport for WebSocketTransport {
} }
} }
fn handle_response(&mut self, id: usize, res: Result<serde_json::Value>) -> Option<Bytes> { fn handle_response(&mut self, id: usize, res: Result<serde_json::Value>) -> Option<Vec<u8>> {
if let Some(id) = self.sid_to_id.get(&id) { if let Some(id) = self.sid_to_id.get(&id) {
match res { match res {
Ok(value) => Some( Ok(value) => Some(
@ -211,8 +210,7 @@ impl ConnectionTransport for WebSocketTransport {
.unwrap(), .unwrap(),
), ),
}) })
.unwrap() .unwrap(),
.into(),
), ),
Err(err) => Some( Err(err) => Some(
serde_json::to_vec(&OperationMessage { serde_json::to_vec(&OperationMessage {
@ -220,8 +218,7 @@ impl ConnectionTransport for WebSocketTransport {
id: Some(id.to_string()), id: Some(id.to_string()),
payload: Some(serde_json::to_value(GQLError(&err)).unwrap()), payload: Some(serde_json::to_value(GQLError(&err)).unwrap()),
}) })
.unwrap() .unwrap(),
.into(),
), ),
} }
} else { } else {