This commit is contained in:
sunli 2020-03-29 20:02:52 +08:00
parent 643213206b
commit 9cdd0c45a3
19 changed files with 706 additions and 549 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql"
version = "1.6.3"
version = "1.6.4"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "The GraphQL server library implemented by rust"
@ -15,10 +15,10 @@ readme = "README.md"
[features]
default = ["bson", "uuid", "url", "chrono-tz", "validators"]
validators = ["regex", "once_cell"]
validators = ["regex"]
[dependencies]
async-graphql-derive = { path = "async-graphql-derive", version = "1.6.3" }
async-graphql-derive = { path = "async-graphql-derive", version = "1.6.4" }
graphql-parser = "0.2.3"
anyhow = "1.0.26"
thiserror = "1.0.11"
@ -34,7 +34,8 @@ byteorder = "1.3.4"
futures = "0.3.0"
parking_lot = "0.10.0"
chrono = "0.4.10"
once_cell = { version = "1.3.1", optional = true }
slab = "0.4.2"
once_cell = "1.3.1"
regex = { version = "1.3.5", optional = true }
bson = { version = "0.14.1", optional = true }
uuid = { version = "0.8.1", optional = true }
@ -45,7 +46,6 @@ chrono-tz = { version = "0.5.1", optional = true }
async-std = { version = "1.5.0", features = ["attributes"] }
actix-web = "2.0.0"
actix-rt = "1.0.0"
slab = "0.4.2"
tide = "0.6.0"
mime = "0.3.16"

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql-actix-web"
version = "0.6.6"
version = "0.6.7"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "async-graphql for actix-web"
@ -13,7 +13,7 @@ keywords = ["futures", "async", "graphql"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-graphql = { path = "..", version = "1.6.3" }
async-graphql = { path = "..", version = "1.6.4" }
actix-web = "2.0.0"
actix-multipart = "0.2.0"
actix-web-actors = "2.0.0"

View File

@ -1,6 +1,5 @@
use actix_web::{web, App, HttpServer};
use async_graphql::{Context, Result, Schema, ID};
use async_graphql_actix_web::publish_message;
use async_graphql::{publish, Context, Result, Schema, ID};
use futures::lock::Mutex;
use slab::Slab;
use std::sync::Arc;
@ -58,10 +57,11 @@ impl MutationRoot {
author,
};
entry.insert(book);
publish_message(BookChanged {
publish(BookChanged {
mutation_type: MutationType::Created,
id: id.clone(),
});
})
.await;
id
}
@ -71,10 +71,11 @@ impl MutationRoot {
let id = id.parse::<usize>()?;
if books.contains(id) {
books.remove(id);
publish_message(BookChanged {
publish(BookChanged {
mutation_type: MutationType::Deleted,
id: id.into(),
});
})
.await;
Ok(true)
} else {
Ok(false)
@ -122,8 +123,9 @@ impl SubscriptionRoot {
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(move || {
let schema =
Schema::new(QueryRoot, MutationRoot, SubscriptionRoot).data(Storage::default());
let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot)
.data(Storage::default())
.finish();
let handler = async_graphql_actix_web::HandlerBuilder::new(schema)
.enable_ui("http://localhost:8000", Some("ws://localhost:8000"))
.enable_subscription()

View File

@ -2,12 +2,6 @@
#![warn(missing_docs)]
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate actix_derive;
mod pubsub;
mod session;
use crate::session::WsSession;
@ -24,9 +18,8 @@ use mime::Mime;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
pub use pubsub::publish_message;
// pub use pubsub::publish_message;
/// Actix-web handler builder
pub struct HandlerBuilder<Query, Mutation, Subscription> {
@ -99,9 +92,9 @@ where
HttpRequest,
Payload,
) -> Pin<Box<dyn Future<Output = actix_web::Result<HttpResponse>>>>
+ 'static
+ Clone {
let schema = Arc::new(self.schema);
+ Clone
+ 'static {
let schema = self.schema.clone();
let max_file_size = self.max_file_size;
let max_file_count = self.max_file_count;
let enable_ui = self.enable_ui;
@ -156,9 +149,9 @@ async fn handle_request<Query, Mutation, Subscription>(
mut payload: Payload,
) -> actix_web::Result<HttpResponse>
where
Query: ObjectType + Send + Sync,
Mutation: ObjectType + Send + Sync,
Subscription: SubscriptionType + Send + Sync,
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
if let Ok(ct) = get_content_type(req.headers()) {
if ct.essence_str() == mime::MULTIPART_FORM_DATA {

View File

@ -1,86 +0,0 @@
use actix::{Actor, Context, Handler, Recipient, Supervised, SystemService};
use async_graphql::Result;
use slab::Slab;
use std::any::Any;
use std::sync::Arc;
#[derive(Message)]
#[rtype(result = "std::result::Result<(), ()>")]
pub struct PushMessage(pub Arc<dyn Any + Sync + Send>);
#[derive(Message)]
#[rtype(result = "usize")]
struct NewClient {
recipient: Recipient<PushMessage>,
}
#[derive(Message)]
#[rtype(result = "()")]
struct RemoveClient {
id: usize,
}
#[derive(Message)]
#[rtype(result = "()")]
struct PubMessage(Arc<dyn Any + Sync + Send>);
struct ClientInfo {
recipient: Recipient<PushMessage>,
}
#[derive(Default)]
struct PubSubService {
clients: Slab<ClientInfo>,
}
impl Actor for PubSubService {
type Context = Context<Self>;
}
impl Handler<NewClient> for PubSubService {
type Result = usize;
fn handle(&mut self, msg: NewClient, _ctx: &mut Context<Self>) -> Self::Result {
self.clients.insert(ClientInfo {
recipient: msg.recipient,
})
}
}
impl Handler<RemoveClient> for PubSubService {
type Result = ();
fn handle(&mut self, msg: RemoveClient, _ctx: &mut Context<Self>) -> Self::Result {
self.clients.remove(msg.id);
}
}
impl Handler<PubMessage> for PubSubService {
type Result = ();
fn handle(&mut self, msg: PubMessage, _ctx: &mut Context<Self>) -> Self::Result {
for (_, client) in &self.clients {
client.recipient.do_send(PushMessage(msg.0.clone())).ok();
}
}
}
impl Supervised for PubSubService {}
impl SystemService for PubSubService {}
pub async fn new_client(recipient: Recipient<PushMessage>) -> Result<usize> {
let id = PubSubService::from_registry()
.send(NewClient { recipient })
.await?;
Ok(id)
}
pub fn remove_client(id: usize) {
PubSubService::from_registry().do_send(RemoveClient { id });
}
/// Publish a message that will be pushed to all subscribed clients.
pub fn publish_message<T: Any + Send + Sync + Sized>(msg: T) {
PubSubService::from_registry().do_send(PubMessage(Arc::new(msg)));
}

View File

@ -1,28 +1,17 @@
use crate::pubsub::{new_client, remove_client, PushMessage};
use actix::{
Actor, ActorContext, ActorFuture, AsyncContext, ContextFutureSpawner, Handler,
ResponseActFuture, Running, StreamHandler, WrapFuture,
Actor, ActorContext, ActorFuture, AsyncContext, ContextFutureSpawner, StreamHandler, WrapFuture,
};
use actix_web_actors::ws::{Message, ProtocolError, WebsocketContext};
use async_graphql::http::{GQLError, GQLRequest, GQLResponse};
use async_graphql::{ObjectType, QueryResult, Schema, Subscribe, SubscriptionType, Variables};
use std::collections::HashMap;
use std::sync::Arc;
use async_graphql::{ObjectType, Schema, SubscriptionType, WebSocketTransport};
use bytes::Bytes;
use futures::channel::mpsc;
use futures::SinkExt;
use std::time::{Duration, Instant};
#[derive(Serialize, Deserialize)]
struct OperationMessage {
#[serde(rename = "type")]
ty: String,
id: Option<String>,
payload: Option<serde_json::Value>,
}
pub struct WsSession<Query, Mutation, Subscription> {
schema: Arc<Schema<Query, Mutation, Subscription>>,
schema: Schema<Query, Mutation, Subscription>,
hb: Instant,
client_id: usize,
subscribes: HashMap<String, Arc<Subscribe>>,
sink: Option<mpsc::Sender<Bytes>>,
}
impl<Query, Mutation, Subscription> WsSession<Query, Mutation, Subscription>
@ -31,12 +20,11 @@ where
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
pub fn new(schema: Arc<Schema<Query, Mutation, Subscription>>) -> Self {
pub fn new(schema: Schema<Query, Mutation, Subscription>) -> Self {
Self {
schema,
hb: Instant::now(),
client_id: 0,
subscribes: Default::default(),
sink: None,
}
}
@ -59,19 +47,20 @@ where
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
new_client(ctx.address().recipient())
.into_actor(self)
.then(|client_id, actor, _| {
actor.client_id = client_id.unwrap();
async {}.into_actor(actor)
})
.wait(ctx);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
remove_client(self.client_id);
Running::Stop
let schema = self.schema.clone();
async move {
schema
.clone()
.subscription_connection(WebSocketTransport::default())
.await
}
.into_actor(self)
.then(|(sink, stream), actor, ctx| {
actor.sink = Some(sink);
ctx.add_stream(stream);
async {}.into_actor(actor)
})
.wait(ctx);
}
}
@ -100,67 +89,11 @@ where
self.hb = Instant::now();
}
Message::Text(s) => {
if let Ok(msg) = serde_json::from_str::<OperationMessage>(&s) {
match msg.ty.as_str() {
"connection_init" => {
ctx.text(
serde_json::to_string(&OperationMessage {
ty: "connection_ack".to_string(),
id: None,
payload: None,
})
.unwrap(),
);
}
"start" => {
if let (Some(id), Some(payload)) = (msg.id, msg.payload) {
if let Ok(request) = serde_json::from_value::<GQLRequest>(payload) {
let builder = self.schema.subscribe(&request.query);
let builder = if let Some(variables) = request.variables {
match Variables::parse_from_json(variables) {
Ok(variables) => builder.variables(variables),
Err(_) => builder,
}
} else {
builder
};
let builder =
if let Some(operation_name) = &request.operation_name {
builder.operator_name(&operation_name)
} else {
builder
};
let subscribe = match builder.execute() {
Ok(subscribe) => subscribe,
Err(err) => {
ctx.text(
serde_json::to_string(&OperationMessage {
ty: "error".to_string(),
id: Some(id),
payload: Some(
serde_json::to_value(GQLError(&err))
.unwrap(),
),
})
.unwrap(),
);
return;
}
};
self.subscribes.insert(id, Arc::new(subscribe));
}
}
}
"stop" => {
if let Some(id) = msg.id {
self.subscribes.remove(&id);
}
}
"connection_terminate" => {
ctx.stop();
}
_ => {}
}
if let Some(mut sink) = self.sink.clone() {
async move { sink.send(s.into()).await }
.into_actor(self)
.then(|_, actor, _| async {}.into_actor(actor))
.wait(ctx);
}
}
Message::Binary(_) | Message::Close(_) | Message::Continuation(_) => {
@ -171,52 +104,14 @@ where
}
}
impl<Query, Mutation, Subscription> Handler<PushMessage>
impl<Query, Mutation, Subscription> StreamHandler<Bytes>
for WsSession<Query, Mutation, Subscription>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
type Result = ResponseActFuture<Self, std::result::Result<(), ()>>;
fn handle(&mut self, msg: PushMessage, _ctx: &mut Self::Context) -> Self::Result {
let subscribes = self.subscribes.clone();
let schema = self.schema.clone();
Box::new(
async move {
let mut push_msgs = Vec::new();
for (id, subscribe) in subscribes {
let res = match subscribe.resolve(&schema, msg.0.as_ref()).await {
Ok(Some(value)) => Some(Ok(value)),
Ok(None) => None,
Err(err) => Some(Err(err)),
};
if let Some(res) = res {
let push_msg = serde_json::to_string(&OperationMessage {
ty: "data".to_string(),
id: Some(id.clone()),
payload: Some(
serde_json::to_value(GQLResponse(res.map(|data| QueryResult {
data,
extensions: None,
})))
.unwrap(),
),
})
.unwrap();
push_msgs.push(push_msg);
}
}
push_msgs
}
.into_actor(self)
.map(|msgs, _, ctx| {
for msg in msgs {
ctx.text(msg);
}
Ok(())
}),
)
fn handle(&mut self, data: Bytes, ctx: &mut Self::Context) {
ctx.text(unsafe { std::str::from_utf8_unchecked(&data) });
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "async-graphql-derive"
version = "1.6.3"
version = "1.6.4"
authors = ["sunli <scott_s829@163.com>"]
edition = "2018"
description = "Macros for async-graphql"

View File

@ -27,9 +27,10 @@ async fn main() -> std::io::Result<()> {
HttpServer::new(move || {
App::new()
.data(
Schema::new(starwars::QueryRoot, EmptyMutation, EmptySubscription)
Schema::build(starwars::QueryRoot, EmptyMutation, EmptySubscription)
.data(starwars::StarWars::new())
.extension(|| async_graphql::extensions::ApolloTracing::default()),
.extension(|| async_graphql::extensions::ApolloTracing::default())
.finish(),
)
.service(web::resource("/").guard(guard::Post()).to(index))
.service(web::resource("/").guard(guard::Get()).to(gql_playgound))

View File

@ -28,8 +28,9 @@ async fn gql_graphiql(_request: Request<StarWarsSchema>) -> Response {
#[async_std::main]
async fn main() -> std::io::Result<()> {
let mut app = tide::with_state(
Schema::new(starwars::QueryRoot, EmptyMutation, EmptySubscription)
.data(starwars::StarWars::new()),
Schema::build(starwars::QueryRoot, EmptyMutation, EmptySubscription)
.data(starwars::StarWars::new())
.finish(),
);
app.at("/").post(index);
app.at("/").get(gql_playground);

View File

@ -35,9 +35,9 @@ impl GQLRequest {
schema: &Schema<Query, Mutation, Subscription>,
) -> GQLResponse
where
Query: ObjectType + Send + Sync,
Mutation: ObjectType + Send + Sync,
Subscription: SubscriptionType + Send + Sync,
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
match self.prepare(schema) {
Ok(query) => GQLResponse(query.execute().await),
@ -51,9 +51,9 @@ impl GQLRequest {
schema: &'a Schema<Query, Mutation, Subscription>,
) -> Result<PreparedQuery<'a, Query, Mutation>>
where
Query: ObjectType + Send + Sync,
Mutation: ObjectType + Send + Sync,
Subscription: SubscriptionType + Send + Sync,
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
let vars = match self.variables.take() {
Some(value) => match Variables::parse_from_json(value) {

View File

@ -104,8 +104,11 @@ pub use graphql_parser::query::Value;
pub use query::{PreparedQuery, QueryBuilder, QueryResult};
pub use registry::CacheControl;
pub use scalars::ID;
pub use schema::Schema;
pub use subscription::SubscribeBuilder;
pub use schema::{publish, Schema};
pub use subscription::{
SubscriptionStream, SubscriptionStub, SubscriptionStubs, SubscriptionTransport,
WebSocketTransport,
};
pub use types::{
Connection, DataSource, EmptyEdgeFields, EmptyMutation, EmptySubscription, QueryOperation,
Upload,
@ -129,7 +132,7 @@ pub use context::ContextBase;
#[doc(hidden)]
pub use resolver::{collect_fields, do_resolve};
#[doc(hidden)]
pub use subscription::{Subscribe, SubscriptionType};
pub use subscription::SubscriptionType;
#[doc(hidden)]
pub use types::{EnumItem, EnumType};
@ -481,7 +484,7 @@ pub use async_graphql_derive::InputObject;
///
/// #[async_std::main]
/// async fn main() {
/// let schema = Schema::new(QueryRoot, EmptyMutation, EmptySubscription).data("hello".to_string());
/// let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription).data("hello".to_string()).finish();
/// let res = schema.query(r#"
/// {
/// typeA {

View File

@ -58,16 +58,16 @@ impl<'a, Query, Mutation, Subscription> QueryBuilder<'a, Query, Mutation, Subscr
cache_control,
complexity,
depth,
} = check_rules(&self.schema.registry, &document)?;
} = check_rules(&self.schema.0.registry, &document)?;
self.extensions.iter().for_each(|e| e.validation_end());
if let Some(limit_complexity) = self.schema.complexity {
if let Some(limit_complexity) = self.schema.0.complexity {
if complexity > limit_complexity {
return Err(QueryError::TooComplex.into());
}
}
if let Some(limit_depth) = self.schema.depth {
if let Some(limit_depth) = self.schema.0.depth {
if depth > limit_depth {
return Err(QueryError::TooDeep.into());
}
@ -83,14 +83,14 @@ impl<'a, Query, Mutation, Subscription> QueryBuilder<'a, Query, Mutation, Subscr
Definition::Operation(operation_definition) => match operation_definition {
OperationDefinition::SelectionSet(s) => {
selection_set = Some(s);
root = Some(Root::Query(&self.schema.query));
root = Some(Root::Query(&self.schema.0.query));
}
OperationDefinition::Query(query)
if query.name.is_none() || query.name.as_deref() == self.operation_name =>
{
selection_set = Some(query.selection_set);
variable_definitions = Some(query.variable_definitions);
root = Some(Root::Query(&self.schema.query));
root = Some(Root::Query(&self.schema.0.query));
}
OperationDefinition::Mutation(mutation)
if mutation.name.is_none()
@ -98,7 +98,7 @@ impl<'a, Query, Mutation, Subscription> QueryBuilder<'a, Query, Mutation, Subscr
{
selection_set = Some(mutation.selection_set);
variable_definitions = Some(mutation.variable_definitions);
root = Some(Root::Mutation(&self.schema.mutation));
root = Some(Root::Mutation(&self.schema.0.mutation));
}
OperationDefinition::Subscription(subscription)
if subscription.name.is_none()
@ -116,7 +116,7 @@ impl<'a, Query, Mutation, Subscription> QueryBuilder<'a, Query, Mutation, Subscr
Ok(PreparedQuery {
extensions: self.extensions,
registry: &self.schema.registry,
registry: &self.schema.0.registry,
variables: self.variables.unwrap_or_default(),
data: self.data,
fragments,

View File

@ -3,13 +3,33 @@ use crate::extensions::{BoxExtension, Extension};
use crate::model::__DirectiveLocation;
use crate::query::QueryBuilder;
use crate::registry::{Directive, InputValue, Registry};
use crate::subscription::{create_connection, SubscriptionStub, SubscriptionTransport};
use crate::types::QueryRoot;
use crate::{ObjectType, SubscribeBuilder, SubscriptionType, Type};
use std::any::Any;
use crate::validation::check_rules;
use crate::{
ContextSelectionSet, ObjectType, QueryError, QueryParseError, Result, SubscriptionStream,
SubscriptionType, Type, Variables,
};
use bytes::Bytes;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::SinkExt;
use graphql_parser::parse_query;
use graphql_parser::query::{
Definition, Field, FragmentDefinition, OperationDefinition, Selection,
};
use once_cell::sync::Lazy;
use slab::Slab;
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
/// GraphQL schema
pub struct Schema<Query, Mutation, Subscription> {
type MsgSender = mpsc::Sender<Arc<dyn Any + Sync + Send>>;
pub(crate) static SUBSCRIPTION_SENDERS: Lazy<Mutex<Slab<MsgSender>>> = Lazy::new(Default::default);
pub(crate) struct SchemaInner<Query, Mutation, Subscription> {
pub(crate) query: QueryRoot<Query>,
pub(crate) mutation: Mutation,
pub(crate) subscription: Subscription,
@ -20,15 +40,80 @@ pub struct Schema<Query, Mutation, Subscription> {
pub(crate) extensions: Vec<Box<dyn Fn() -> BoxExtension + Send + Sync>>,
}
/// Schema builder
pub struct SchemaBuilder<Query, Mutation, Subscription>(SchemaInner<Query, Mutation, Subscription>);
impl<Query: ObjectType, Mutation: ObjectType, Subscription: SubscriptionType>
Schema<Query, Mutation, Subscription>
SchemaBuilder<Query, Mutation, Subscription>
{
/// Create a schema
/// Disable introspection query
pub fn disable_introspection(mut self) -> Self {
self.0.query.disable_introspection = true;
self
}
/// Set limit complexity, Default no limit.
pub fn limit_complexity(mut self, complexity: usize) -> Self {
self.0.complexity = Some(complexity);
self
}
/// Set limit complexity, Default no limit.
pub fn limit_depth(mut self, depth: usize) -> Self {
self.0.depth = Some(depth);
self
}
/// Add an extension
pub fn extension<F: Fn() -> E + Send + Sync + 'static, E: Extension>(
mut self,
extension_factory: F,
) -> Self {
self.0
.extensions
.push(Box::new(move || Box::new(extension_factory())));
self
}
/// Add a global data that can be accessed in the `Context`.
pub fn data<D: Any + Send + Sync>(mut self, data: D) -> Self {
self.0.data.insert(data);
self
}
/// Build schema.
pub fn finish(self) -> Schema<Query, Mutation, Subscription> {
Schema(Arc::new(self.0))
}
}
/// GraphQL schema
pub struct Schema<Query, Mutation, Subscription>(
pub(crate) Arc<SchemaInner<Query, Mutation, Subscription>>,
);
impl<Query, Mutation, Subscription> Clone for Schema<Query, Mutation, Subscription> {
fn clone(&self) -> Self {
Schema(self.0.clone())
}
}
impl<Query, Mutation, Subscription> Schema<Query, Mutation, Subscription>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
/// Create a schema builder
///
/// The root object for the query and Mutation needs to be specified.
/// If there is no mutation, you can use `EmptyMutation`.
/// If there is no subscription, you can use `EmptySubscription`.
pub fn new(query: Query, mutation: Mutation, subscription: Subscription) -> Self {
pub fn build(
query: Query,
mutation: Mutation,
subscription: Subscription,
) -> SchemaBuilder<Query, Mutation, Subscription> {
let mut registry = Registry {
types: Default::default(),
directives: Default::default(),
@ -102,7 +187,7 @@ impl<Query: ObjectType, Mutation: ObjectType, Subscription: SubscriptionType>
Subscription::create_type_info(&mut registry);
}
Self {
SchemaBuilder(SchemaInner {
query: QueryRoot {
inner: query,
disable_introspection: false,
@ -114,64 +199,161 @@ impl<Query: ObjectType, Mutation: ObjectType, Subscription: SubscriptionType>
complexity: None,
depth: None,
extensions: Default::default(),
}
})
}
/// Disable introspection query
pub fn disable_introspection(mut self) -> Self {
self.query.disable_introspection = true;
self
}
/// Set limit complexity, Default no limit.
pub fn limit_complexity(mut self, complexity: usize) -> Self {
self.complexity = Some(complexity);
self
}
/// Set limit complexity, Default no limit.
pub fn limit_depth(mut self, depth: usize) -> Self {
self.depth = Some(depth);
self
}
/// Add an extension
pub fn extension<F: Fn() -> E + Send + Sync + 'static, E: Extension>(
mut self,
extension_factory: F,
) -> Self {
self.extensions
.push(Box::new(move || Box::new(extension_factory())));
self
}
/// Add a global data that can be accessed in the `Context`.
pub fn data<D: Any + Send + Sync>(mut self, data: D) -> Self {
self.data.insert(data);
self
/// Create a schema
pub fn new(
query: Query,
mutation: Mutation,
subscription: Subscription,
) -> Schema<Query, Mutation, Subscription> {
Self::build(query, mutation, subscription).finish()
}
/// Start a query and return `QueryBuilder`.
pub fn query<'a>(&'a self, source: &'a str) -> QueryBuilder<'a, Query, Mutation, Subscription> {
QueryBuilder {
extensions: self.extensions.iter().map(|factory| factory()).collect(),
extensions: self.0.extensions.iter().map(|factory| factory()).collect(),
schema: self,
source,
operation_name: None,
variables: None,
data: &self.data,
data: &self.0.data,
}
}
/// Start a subscribe and return `SubscribeBuilder`.
pub fn subscribe<'a>(&'a self, source: &'a str) -> SubscribeBuilder<'a, Subscription> {
SubscribeBuilder {
extensions: Default::default(),
subscription: &self.subscription,
registry: &self.registry,
source,
operation_name: None,
variables: None,
/// Create subscription stub, typically called inside the `SubscriptionTransport::handle_request` method/
pub fn create_subscription_stub(
&self,
source: &str,
operation_name: Option<&str>,
variables: Variables,
) -> Result<SubscriptionStub<Query, Mutation, Subscription>>
where
Self: Sized,
{
let document = parse_query(source).map_err(|err| QueryParseError(err.to_string()))?;
check_rules(&self.0.registry, &document)?;
let mut fragments = HashMap::new();
let mut subscription = None;
for definition in document.definitions {
match definition {
Definition::Operation(OperationDefinition::Subscription(s)) => {
if s.name.as_deref() == operation_name {
subscription = Some(s);
break;
}
}
Definition::Fragment(fragment) => {
fragments.insert(fragment.name.clone(), fragment);
}
_ => {}
}
}
let subscription = subscription.ok_or(if let Some(name) = operation_name {
QueryError::UnknownOperationNamed {
name: name.to_string(),
}
} else {
QueryError::MissingOperation
})?;
let mut types = HashMap::new();
let resolve_id = AtomicUsize::default();
let ctx = ContextSelectionSet {
path_node: None,
extensions: &[],
item: &subscription.selection_set,
resolve_id: &resolve_id,
variables: &variables,
variable_definitions: Some(&subscription.variable_definitions),
registry: &self.0.registry,
data: &Default::default(),
fragments: &fragments,
};
create_subscription_types::<Subscription>(&ctx, &fragments, &mut types)?;
Ok(SubscriptionStub {
schema: self.clone(),
types,
variables,
variable_definitions: subscription.variable_definitions,
fragments,
})
}
/// Create subscription connection, returns `Sink` and `Stream`.
pub async fn subscription_connection<T: SubscriptionTransport>(
&self,
transport: T,
) -> (
mpsc::Sender<Bytes>,
SubscriptionStream<Query, Mutation, Subscription, T>,
) {
create_connection(self, transport).await
}
}
fn create_subscription_types<T: SubscriptionType>(
ctx: &ContextSelectionSet<'_>,
fragments: &HashMap<String, FragmentDefinition>,
types: &mut HashMap<TypeId, Field>,
) -> Result<()> {
for selection in &ctx.items {
match selection {
Selection::Field(field) => {
if ctx.is_skip(&field.directives)? {
continue;
}
T::create_type(field, types)?;
}
Selection::FragmentSpread(fragment_spread) => {
if ctx.is_skip(&fragment_spread.directives)? {
continue;
}
if let Some(fragment) = fragments.get(&fragment_spread.fragment_name) {
create_subscription_types::<T>(
&ctx.with_selection_set(&fragment.selection_set),
fragments,
types,
)?;
} else {
return Err(QueryError::UnknownFragment {
name: fragment_spread.fragment_name.clone(),
}
.into());
}
}
Selection::InlineFragment(inline_fragment) => {
if ctx.is_skip(&inline_fragment.directives)? {
continue;
}
create_subscription_types::<T>(
&ctx.with_selection_set(&inline_fragment.selection_set),
fragments,
types,
)?;
}
}
}
Ok(())
}
/// Publish a message that will be pushed to all subscribed clients.
pub async fn publish<T: Any + Send + Sync + Sized>(msg: T) {
let mut senders = SUBSCRIPTION_SENDERS.lock().await;
let msg = Arc::new(msg);
let mut remove = Vec::new();
for (id, sender) in senders.iter_mut() {
if sender.send(msg.clone()).await.is_err() {
remove.push(id);
}
}
for id in remove {
senders.remove(id);
}
}

View File

@ -1,224 +0,0 @@
use crate::extensions::BoxExtension;
use crate::registry::Registry;
use crate::validation::check_rules;
use crate::{
ContextBase, ContextSelectionSet, QueryError, QueryParseError, Result, Schema, Type, Variables,
};
use graphql_parser::parse_query;
use graphql_parser::query::{
Definition, Field, FragmentDefinition, OperationDefinition, Selection, SelectionSet,
VariableDefinition,
};
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
/// Subscribe stub
///
/// When a new push message is generated, a JSON object that needs to be pushed can be obtained by
/// `Subscribe::resolve`, and if None is returned, the Subscribe is not subscribed to a message of this type.
pub struct Subscribe {
types: HashMap<TypeId, Field>,
variables: Variables,
variable_definitions: Vec<VariableDefinition>,
fragments: HashMap<String, FragmentDefinition>,
}
impl Subscribe {
#[allow(missing_docs)]
pub async fn resolve<Query, Mutation, Subscription>(
&self,
schema: &Schema<Query, Mutation, Subscription>,
msg: &(dyn Any + Send + Sync),
) -> Result<Option<serde_json::Value>>
where
Subscription: SubscriptionType + Sync + Send + 'static,
{
let resolve_id = AtomicUsize::default();
let ctx = ContextBase::<()> {
path_node: None,
extensions: &[],
item: (),
resolve_id: &resolve_id,
variables: &self.variables,
variable_definitions: Some(&self.variable_definitions),
registry: &schema.registry,
data: &schema.data,
fragments: &self.fragments,
};
schema.subscription.resolve(&ctx, &self.types, msg).await
}
}
/// Represents a GraphQL subscription object
#[allow(missing_docs)]
#[async_trait::async_trait]
pub trait SubscriptionType: Type {
/// This function returns true of type `EmptySubscription` only
#[doc(hidden)]
fn is_empty() -> bool {
false
}
fn create_type(field: &Field, types: &mut HashMap<TypeId, Field>) -> Result<()>;
fn create_subscribe(
&self,
extensions: &[BoxExtension],
registry: &Registry,
selection_set: SelectionSet,
variables: Variables,
variable_definitions: Vec<VariableDefinition>,
fragments: HashMap<String, FragmentDefinition>,
) -> Result<Subscribe>
where
Self: Sized,
{
let mut types = HashMap::new();
let resolve_id = AtomicUsize::default();
let ctx = ContextSelectionSet {
path_node: None,
extensions,
item: &selection_set,
resolve_id: &resolve_id,
variables: &variables,
variable_definitions: Some(&variable_definitions),
registry,
data: &Default::default(),
fragments: &fragments,
};
create_types::<Self>(&ctx, &fragments, &mut types)?;
Ok(Subscribe {
types,
variables,
variable_definitions,
fragments,
})
}
/// Resolve a subscription message, If no message of this type is subscribed, None is returned.
async fn resolve(
&self,
ctx: &ContextBase<'_, ()>,
types: &HashMap<TypeId, Field>,
msg: &(dyn Any + Send + Sync),
) -> Result<Option<serde_json::Value>>;
}
fn create_types<T: SubscriptionType>(
ctx: &ContextSelectionSet<'_>,
fragments: &HashMap<String, FragmentDefinition>,
types: &mut HashMap<TypeId, Field>,
) -> Result<()> {
for selection in &ctx.items {
match selection {
Selection::Field(field) => {
if ctx.is_skip(&field.directives)? {
continue;
}
T::create_type(field, types)?;
}
Selection::FragmentSpread(fragment_spread) => {
if ctx.is_skip(&fragment_spread.directives)? {
continue;
}
if let Some(fragment) = fragments.get(&fragment_spread.fragment_name) {
create_types::<T>(
&ctx.with_selection_set(&fragment.selection_set),
fragments,
types,
)?;
} else {
return Err(QueryError::UnknownFragment {
name: fragment_spread.fragment_name.clone(),
}
.into());
}
}
Selection::InlineFragment(inline_fragment) => {
if ctx.is_skip(&inline_fragment.directives)? {
continue;
}
create_types::<T>(
&ctx.with_selection_set(&inline_fragment.selection_set),
fragments,
types,
)?;
}
}
}
Ok(())
}
/// Subscribe builder
pub struct SubscribeBuilder<'a, Subscription> {
pub(crate) subscription: &'a Subscription,
pub(crate) extensions: &'a [BoxExtension],
pub(crate) registry: &'a Registry,
pub(crate) source: &'a str,
pub(crate) operation_name: Option<&'a str>,
pub(crate) variables: Option<Variables>,
}
impl<'a, Subscription> SubscribeBuilder<'a, Subscription>
where
Subscription: SubscriptionType,
{
/// Specify the operation name.
pub fn operator_name(self, name: &'a str) -> Self {
SubscribeBuilder {
operation_name: Some(name),
..self
}
}
/// Specify the variables.
pub fn variables(self, vars: Variables) -> Self {
SubscribeBuilder {
variables: Some(vars),
..self
}
}
/// Perform a subscription operation and return `Subscribe`.
pub fn execute(self) -> Result<Subscribe> {
let document = parse_query(self.source).map_err(|err| QueryParseError(err.to_string()))?;
check_rules(self.registry, &document)?;
let mut fragments = HashMap::new();
let mut subscription = None;
for definition in document.definitions {
match definition {
Definition::Operation(OperationDefinition::Subscription(s)) => {
if s.name.as_deref() == self.operation_name {
subscription = Some(s);
break;
}
}
Definition::Fragment(fragment) => {
fragments.insert(fragment.name.clone(), fragment);
}
_ => {}
}
}
let subscription = subscription.ok_or(if let Some(name) = self.operation_name {
QueryError::UnknownOperationNamed {
name: name.to_string(),
}
} else {
QueryError::MissingOperation
})?;
self.subscription.create_subscribe(
self.extensions,
self.registry,
subscription.selection_set,
self.variables.unwrap_or_default(),
subscription.variable_definitions,
fragments,
)
}
}

View File

@ -0,0 +1,180 @@
use crate::schema::SUBSCRIPTION_SENDERS;
use crate::subscription::SubscriptionStub;
use crate::{ObjectType, Result, Schema, SubscriptionType};
use bytes::Bytes;
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use futures::{Future, FutureExt, Stream};
use slab::Slab;
use std::any::Any;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
/// Subscription stubs, use to hold all subscription information for the `SubscriptionConnection`
pub struct SubscriptionStubs<Query, Mutation, Subscription>(
Slab<SubscriptionStub<Query, Mutation, Subscription>>,
);
impl<Query, Mutation, Subscription> Default for SubscriptionStubs<Query, Mutation, Subscription> {
fn default() -> Self {
Self(Slab::new())
}
}
#[allow(missing_docs)]
impl<Query, Mutation, Subscription> SubscriptionStubs<Query, Mutation, Subscription> {
pub fn add(&mut self, stub: SubscriptionStub<Query, Mutation, Subscription>) -> usize {
self.0.insert(stub)
}
pub fn remove(&mut self, id: usize) {
self.0.remove(id);
}
}
/// Subscription transport
///
/// You can customize your transport by implementing this trait.
pub trait SubscriptionTransport: Send + Sync + Unpin + 'static {
/// Parse the request data here.
/// If you have a new request, create a `SubscriptionStub` with the `Schema::create_subscription_stub`, and then call `SubscriptionStubs::add`.
/// You can return a `Byte`, which will be sent to the client. If it returns an error, the connection will be broken.
fn handle_request<Query, Mutation, Subscription>(
&mut self,
schema: &Schema<Query, Mutation, Subscription>,
stubs: &mut SubscriptionStubs<Query, Mutation, Subscription>,
data: Bytes,
) -> Result<Option<Bytes>>
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Sync + Send + 'static;
/// When a response message is generated, you can convert the message to the format you want here.
fn handle_response(&mut self, id: usize, result: Result<serde_json::Value>) -> Option<Bytes>;
}
pub async fn create_connection<Query, Mutation, Subscription, T: SubscriptionTransport>(
schema: &Schema<Query, Mutation, Subscription>,
transport: T,
) -> (
mpsc::Sender<Bytes>,
SubscriptionStream<Query, Mutation, Subscription, T>,
)
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Sync + Send + 'static,
{
let (tx_bytes, rx_bytes) = mpsc::channel(8);
let (tx_msg, rx_msg) = mpsc::channel(8);
let mut senders = SUBSCRIPTION_SENDERS.lock().await;
senders.insert(tx_msg);
(
tx_bytes.clone(),
SubscriptionStream {
schema: schema.clone(),
transport,
stubs: Default::default(),
rx_bytes,
rx_msg,
send_queue: VecDeque::new(),
resolve_queue: VecDeque::default(),
resolve_fut: None,
},
)
}
#[allow(missing_docs)]
pub struct SubscriptionStream<Query, Mutation, Subscription, T: SubscriptionTransport> {
schema: Schema<Query, Mutation, Subscription>,
transport: T,
stubs: SubscriptionStubs<Query, Mutation, Subscription>,
rx_bytes: mpsc::Receiver<Bytes>,
rx_msg: mpsc::Receiver<Arc<dyn Any + Sync + Send>>,
send_queue: VecDeque<Bytes>,
resolve_queue: VecDeque<Arc<dyn Any + Sync + Send>>,
resolve_fut: Option<Pin<Box<dyn Future<Output = ()>>>>,
}
impl<Query, Mutation, Subscription, T> Stream
for SubscriptionStream<Query, Mutation, Subscription, T>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
T: SubscriptionTransport,
{
type Item = Bytes;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// send bytes
if let Some(bytes) = self.send_queue.pop_front() {
println!("{}", String::from_utf8(bytes.to_vec()).unwrap());
return Poll::Ready(Some(bytes));
}
// receive bytes
match Pin::new(&mut self.rx_bytes).poll_next(cx) {
Poll::Ready(Some(data)) => {
let this = &mut *self;
match this
.transport
.handle_request(&this.schema, &mut this.stubs, data)
{
Ok(Some(bytes)) => {
this.send_queue.push_back(bytes);
continue;
}
Ok(None) => {}
Err(_) => return Poll::Ready(None),
}
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {}
}
if let Some(resolve_fut) = &mut self.resolve_fut {
match resolve_fut.poll_unpin(cx) {
Poll::Ready(_) => {
self.resolve_fut = None;
}
Poll::Pending => return Poll::Pending,
}
} else if let Some(msg) = self.resolve_queue.pop_front() {
// FIXME: I think this code is safe, but I don't know how to implement it in safe code.
let this = &mut *self;
let stubs = &this.stubs as *const SubscriptionStubs<Query, Mutation, Subscription>;
let transport = &mut this.transport as *mut T;
let send_queue = &mut this.send_queue as *mut VecDeque<Bytes>;
let fut = async move {
unsafe {
for (id, stub) in (*stubs).0.iter() {
if let Some(res) = stub.resolve(msg.as_ref()).await.transpose() {
if let Some(bytes) = (*transport).handle_response(id, res) {
(*send_queue).push_back(bytes);
}
}
}
}
};
self.resolve_fut = Some(Box::pin(fut));
continue;
}
// receive msg
match Pin::new(&mut self.rx_msg).poll_next(cx) {
Poll::Ready(Some(msg)) => {
self.resolve_queue.push_back(msg);
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {
// all pending
return Poll::Pending;
}
}
}
}
}

11
src/subscription/mod.rs Normal file
View File

@ -0,0 +1,11 @@
mod connection;
mod subscribe_stub;
mod subscription_type;
mod ws_transport;
pub use connection::{
create_connection, SubscriptionStream, SubscriptionStubs, SubscriptionTransport,
};
pub use subscribe_stub::SubscriptionStub;
pub use subscription_type::SubscriptionType;
pub use ws_transport::WebSocketTransport;

View File

@ -0,0 +1,48 @@
use crate::{ContextBase, ObjectType, Result, Schema, SubscriptionType, Variables};
use graphql_parser::query::{Field, FragmentDefinition, VariableDefinition};
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
/// Subscription stub
///
/// When a new push message is generated, a JSON object that needs to be pushed can be obtained by
/// `Subscribe::resolve`, and if None is returned, the Subscribe is not subscribed to a message of this type.
pub struct SubscriptionStub<Query, Mutation, Subscription> {
pub(crate) schema: Schema<Query, Mutation, Subscription>,
pub(crate) types: HashMap<TypeId, Field>,
pub(crate) variables: Variables,
pub(crate) variable_definitions: Vec<VariableDefinition>,
pub(crate) fragments: HashMap<String, FragmentDefinition>,
}
impl<Query, Mutation, Subscription> SubscriptionStub<Query, Mutation, Subscription>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static,
{
#[doc(hidden)]
pub async fn resolve(
&self,
msg: &(dyn Any + Send + Sync),
) -> Result<Option<serde_json::Value>> {
let resolve_id = AtomicUsize::default();
let ctx = ContextBase::<()> {
path_node: None,
extensions: &[],
item: (),
resolve_id: &resolve_id,
variables: &self.variables,
variable_definitions: Some(&self.variable_definitions),
registry: &self.schema.0.registry,
data: &self.schema.0.data,
fragments: &self.fragments,
};
self.schema
.0
.subscription
.resolve(&ctx, &self.types, msg)
.await
}
}

View File

@ -0,0 +1,25 @@
use crate::{ContextBase, Result, Type};
use graphql_parser::query::Field;
use std::any::{Any, TypeId};
use std::collections::HashMap;
/// Represents a GraphQL subscription object
#[async_trait::async_trait]
pub trait SubscriptionType: Type {
/// This function returns true of type `EmptySubscription` only
#[doc(hidden)]
fn is_empty() -> bool {
false
}
#[doc(hidden)]
fn create_type(field: &Field, types: &mut HashMap<TypeId, Field>) -> Result<()>;
/// Resolve a subscription message, If no message of this type is subscribed, None is returned.
async fn resolve(
&self,
ctx: &ContextBase<'_, ()>,
types: &HashMap<TypeId, Field>,
msg: &(dyn Any + Send + Sync),
) -> Result<Option<serde_json::Value>>;
}

View File

@ -0,0 +1,126 @@
use crate::http::{GQLError, GQLRequest, GQLResponse};
use crate::{
ObjectType, QueryResult, Result, Schema, SubscriptionStubs, SubscriptionTransport,
SubscriptionType, Variables,
};
use bytes::Bytes;
use std::collections::HashMap;
#[derive(Serialize, Deserialize)]
struct OperationMessage {
#[serde(rename = "type")]
ty: String,
id: Option<String>,
payload: Option<serde_json::Value>,
}
/// WebSocket transport
#[derive(Default)]
pub struct WebSocketTransport {
id_to_sid: HashMap<String, usize>,
sid_to_id: HashMap<usize, String>,
}
impl SubscriptionTransport for WebSocketTransport {
fn handle_request<Query, Mutation, Subscription>(
&mut self,
schema: &Schema<Query, Mutation, Subscription>,
stubs: &mut SubscriptionStubs<Query, Mutation, Subscription>,
data: Bytes,
) -> Result<Option<Bytes>>
where
Query: ObjectType + Sync + Send + 'static,
Mutation: ObjectType + Sync + Send + 'static,
Subscription: SubscriptionType + Sync + Send + 'static,
{
match serde_json::from_slice::<OperationMessage>(&data) {
Ok(msg) => match msg.ty.as_str() {
"connection_init" => Ok(Some(
serde_json::to_vec(&OperationMessage {
ty: "connection_ack".to_string(),
id: None,
payload: None,
})
.unwrap()
.into(),
)),
"start" => {
if let (Some(id), Some(payload)) = (msg.id, msg.payload) {
if let Ok(request) = serde_json::from_value::<GQLRequest>(payload) {
let variables = if let Some(value) = request.variables {
match Variables::parse_from_json(value) {
Ok(variables) => variables,
Err(_) => Default::default(),
}
} else {
Default::default()
};
match schema.create_subscription_stub(
&request.query,
request.operation_name.as_deref(),
variables,
) {
Ok(stub) => {
let stub_id = stubs.add(stub);
self.id_to_sid.insert(id.clone(), stub_id);
self.sid_to_id.insert(stub_id, id);
Ok(None)
}
Err(err) => Ok(Some(
serde_json::to_vec(&OperationMessage {
ty: "error".to_string(),
id: Some(id),
payload: Some(
serde_json::to_value(GQLError(&err)).unwrap(),
),
})
.unwrap()
.into(),
)),
}
} else {
Ok(None)
}
} else {
Ok(None)
}
}
"stop" => {
if let Some(id) = msg.id {
if let Some(id) = self.id_to_sid.remove(&id) {
self.sid_to_id.remove(&id);
stubs.remove(id);
}
}
Ok(None)
}
"connection_terminate" => Err(anyhow::anyhow!("connection_terminate")),
_ => Err(anyhow::anyhow!("unknown op")),
},
Err(err) => Err(err.into()),
}
}
fn handle_response(&mut self, id: usize, result: Result<serde_json::Value>) -> Option<Bytes> {
if let Some(id) = self.sid_to_id.get(&id) {
Some(
serde_json::to_vec(&OperationMessage {
ty: "data".to_string(),
id: Some(id.clone()),
payload: Some(
serde_json::to_value(GQLResponse(result.map(|data| QueryResult {
data,
extensions: None,
})))
.unwrap(),
),
})
.unwrap()
.into(),
)
} else {
None
}
}
}