From 2aca257136c9836c5837613c2266d0f0cb9ac15f Mon Sep 17 00:00:00 2001 From: Sunli Date: Wed, 2 Sep 2020 14:27:04 +0800 Subject: [PATCH] Remove unnecessary memory allocation. --- async-graphql-actix-web/src/subscription.rs | 9 ++++----- async-graphql-warp/src/lib.rs | 4 ++-- src/schema.rs | 5 ++--- src/subscription/connection.rs | 19 +++++++++---------- src/subscription/ws_transport.rs | 17 +++++++---------- 5 files changed, 24 insertions(+), 30 deletions(-) diff --git a/async-graphql-actix-web/src/subscription.rs b/async-graphql-actix-web/src/subscription.rs index 2ede7a42..7aa3cabd 100644 --- a/async-graphql-actix-web/src/subscription.rs +++ b/async-graphql-actix-web/src/subscription.rs @@ -3,7 +3,6 @@ use actix::{ }; use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext}; use async_graphql::{Data, FieldResult, ObjectType, Schema, SubscriptionType, WebSocketTransport}; -use bytes::Bytes; use futures::channel::mpsc; use futures::SinkExt; use std::time::{Duration, Instant}; @@ -15,7 +14,7 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); pub struct WSSubscription { schema: Schema, hb: Instant, - sink: Option>, + sink: Option>>, init_context_data: Option FieldResult + Send + Sync>>, } @@ -119,15 +118,15 @@ where } } -impl StreamHandler +impl StreamHandler> for WSSubscription where Query: ObjectType + Send + Sync + 'static, Mutation: ObjectType + Send + Sync + 'static, Subscription: SubscriptionType + Send + Sync + 'static, { - fn handle(&mut self, data: Bytes, ctx: &mut Self::Context) { - if let Ok(text) = std::str::from_utf8(&data) { + fn handle(&mut self, data: Vec, ctx: &mut Self::Context) { + if let Ok(text) = String::from_utf8(data) { ctx.text(text); } } diff --git a/async-graphql-warp/src/lib.rs b/async-graphql-warp/src/lib.rs index 70c81dfa..a6e87c93 100644 --- a/async-graphql-warp/src/lib.rs +++ b/async-graphql-warp/src/lib.rs @@ -201,7 +201,7 @@ where select! { bytes = srx.next() => { if let Some(bytes) = bytes { - if let Ok(text) = String::from_utf8(bytes.to_vec()) { + if let Ok(text) = String::from_utf8(bytes) { if tx.send(Message::text(text)).await.is_err() { return; } @@ -213,7 +213,7 @@ where msg = rx.next() => { if let Some(Ok(msg)) = msg { if msg.is_text() { - if stx.send(msg.into_bytes().into()).await.is_err() { + if stx.send(msg.into_bytes()).await.is_err() { return; } } diff --git a/src/schema.rs b/src/schema.rs index d4fce291..2133cdbe 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -12,7 +12,6 @@ use crate::{ SubscriptionType, Type, Variables, ID, }; use async_graphql_parser::query::{Document, OperationType}; -use bytes::Bytes; use futures::channel::mpsc; use futures::Stream; use indexmap::map::IndexMap; @@ -414,8 +413,8 @@ where &self, transport: T, ) -> ( - mpsc::UnboundedSender, - impl Stream + Unpin, + mpsc::UnboundedSender>, + impl Stream> + Unpin, ) { create_connection(self.clone(), transport) } diff --git a/src/subscription/connection.rs b/src/subscription/connection.rs index 54e0c6cd..29de84f1 100644 --- a/src/subscription/connection.rs +++ b/src/subscription/connection.rs @@ -1,5 +1,4 @@ use crate::{ObjectType, Result, Schema, SubscriptionType}; -use bytes::Bytes; use futures::channel::mpsc; use futures::task::{AtomicWaker, Context, Poll}; use futures::{Stream, StreamExt}; @@ -44,8 +43,8 @@ pub trait ConnectionTransport: Send + Sync + Unpin + 'static { &mut self, schema: &Schema, streams: &mut SubscriptionStreams, - request: Bytes, - send_buf: &mut VecDeque, + request: Vec, + send_buf: &mut VecDeque>, ) -> std::result::Result<(), Self::Error> where Query: ObjectType + Sync + Send + 'static, @@ -53,15 +52,15 @@ pub trait ConnectionTransport: Send + Sync + Unpin + 'static { Subscription: SubscriptionType + Sync + Send + 'static; /// When a response message is generated, you can convert the message to the format you want here. - fn handle_response(&mut self, id: usize, res: Result) -> Option; + fn handle_response(&mut self, id: usize, res: Result) -> Option>; } pub fn create_connection( schema: Schema, mut transport: T, ) -> ( - mpsc::UnboundedSender, - impl Stream + Unpin, + mpsc::UnboundedSender>, + impl Stream> + Unpin, ) where Query: ObjectType + Sync + Send + 'static, @@ -97,7 +96,7 @@ type HandleRequestBoxFut<'a, T> = Pin< std::result::Result<(), ::Error>, &'a mut T, &'a mut SubscriptionStreams, - &'a mut VecDeque, + &'a mut VecDeque>, ), > + Send + 'a, @@ -110,10 +109,10 @@ struct SubscriptionStream<'a, Query, Mutation, Subscription, T: ConnectionTransp schema: &'a Schema, transport: Option<&'a mut T>, streams: Option<&'a mut SubscriptionStreams>, - rx_bytes: mpsc::UnboundedReceiver, + rx_bytes: mpsc::UnboundedReceiver>, handle_request_fut: Option>, waker: AtomicWaker, - send_buf: Option<&'a mut VecDeque>, + send_buf: Option<&'a mut VecDeque>>, } impl<'a, Query, Mutation, Subscription, T> Stream @@ -124,7 +123,7 @@ where Subscription: SubscriptionType + Send + Sync + 'static, T: ConnectionTransport, { - type Item = Bytes; + type Item = Vec; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; diff --git a/src/subscription/ws_transport.rs b/src/subscription/ws_transport.rs index 8bf8a482..3164768d 100644 --- a/src/subscription/ws_transport.rs +++ b/src/subscription/ws_transport.rs @@ -4,7 +4,6 @@ use crate::{ ConnectionTransport, Error, FieldError, FieldResult, ObjectType, QueryBuilder, QueryError, QueryResponse, Result, Schema, SubscriptionStreams, SubscriptionType, Variables, }; -use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -42,9 +41,9 @@ impl WebSocketTransport { } } -fn send_message(send_buf: &mut VecDeque, msg: &T) { +fn send_message(send_buf: &mut VecDeque>, msg: &T) { if let Ok(data) = serde_json::to_vec(msg) { - send_buf.push_back(data.into()); + send_buf.push_back(data); } } @@ -56,8 +55,8 @@ impl ConnectionTransport for WebSocketTransport { &mut self, schema: &Schema, streams: &mut SubscriptionStreams, - request: Bytes, - send_buf: &mut VecDeque, + request: Vec, + send_buf: &mut VecDeque>, ) -> std::result::Result<(), Self::Error> where Query: ObjectType + Sync + Send + 'static, @@ -195,7 +194,7 @@ impl ConnectionTransport for WebSocketTransport { } } - fn handle_response(&mut self, id: usize, res: Result) -> Option { + fn handle_response(&mut self, id: usize, res: Result) -> Option> { if let Some(id) = self.sid_to_id.get(&id) { match res { Ok(value) => Some( @@ -211,8 +210,7 @@ impl ConnectionTransport for WebSocketTransport { .unwrap(), ), }) - .unwrap() - .into(), + .unwrap(), ), Err(err) => Some( serde_json::to_vec(&OperationMessage { @@ -220,8 +218,7 @@ impl ConnectionTransport for WebSocketTransport { id: Some(id.to_string()), payload: Some(serde_json::to_value(GQLError(&err)).unwrap()), }) - .unwrap() - .into(), + .unwrap(), ), } } else {