The subscriptions field supports a return value of type FieldResult
This commit is contained in:
parent
a4781523fb
commit
aac2710c99
|
@ -34,10 +34,8 @@ impl<'a> OutputType<'a> {
|
|||
} else {
|
||||
OutputType::Value(input)
|
||||
}
|
||||
} else if let Type::Reference(_) = input {
|
||||
OutputType::Value(input)
|
||||
} else {
|
||||
return Err(Error::new_spanned(input, "Invalid type"));
|
||||
OutputType::Value(input)
|
||||
};
|
||||
Ok(ty)
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use crate::args;
|
||||
use crate::output_type::OutputType;
|
||||
use crate::utils::{build_value_repr, check_reserved_name, get_crate_name};
|
||||
use inflector::Inflector;
|
||||
use proc_macro::TokenStream;
|
||||
|
@ -63,6 +64,13 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
|
|||
));
|
||||
}
|
||||
|
||||
let ty = match &method.sig.output {
|
||||
ReturnType::Type(_, ty) => OutputType::parse(ty)?,
|
||||
ReturnType::Default => {
|
||||
return Err(Error::new_spanned(&method.sig.output, "Missing type"))
|
||||
}
|
||||
};
|
||||
|
||||
let mut arg_ctx = false;
|
||||
let mut args = Vec::new();
|
||||
|
||||
|
@ -168,20 +176,11 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
|
|||
});
|
||||
}
|
||||
|
||||
let stream_ty = match &method.sig.output {
|
||||
ReturnType::Default => {
|
||||
return Err(Error::new_spanned(
|
||||
&method.sig.output,
|
||||
"Must be return a stream type",
|
||||
))
|
||||
}
|
||||
ReturnType::Type(_, ty) => {
|
||||
if let Type::ImplTrait(TypeImplTrait { bounds, .. }) = ty.as_ref() {
|
||||
quote! { #bounds }
|
||||
} else {
|
||||
quote! { #ty }
|
||||
}
|
||||
}
|
||||
let res_ty = ty.value_type();
|
||||
let stream_ty = if let Type::ImplTrait(TypeImplTrait { bounds, .. }) = &res_ty {
|
||||
quote! { #bounds }
|
||||
} else {
|
||||
quote! { #res_ty }
|
||||
};
|
||||
|
||||
schema_fields.push(quote! {
|
||||
|
@ -208,6 +207,18 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
|
|||
quote! {}
|
||||
};
|
||||
|
||||
let create_field_stream = match &ty {
|
||||
OutputType::Value(_) => quote! {
|
||||
self.#ident(#ctx_param #(#use_params),*).await
|
||||
},
|
||||
OutputType::Result(_, _) => {
|
||||
quote! {
|
||||
self.#ident(#ctx_param #(#use_params),*).await.
|
||||
map_err(|err| err.into_error_with_path(ctx.position, ctx.path_node.as_ref().unwrap().to_json()))?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
create_stream.push(quote! {
|
||||
if ctx.name.as_str() == #field_name {
|
||||
let field_name = ctx.result_name().to_string();
|
||||
|
@ -216,7 +227,7 @@ pub fn generate(object_args: &args::Object, item_impl: &mut ItemImpl) -> Result<
|
|||
let schema = schema.clone();
|
||||
let pos = ctx.position;
|
||||
let environment = environment.clone();
|
||||
let stream = #crate_name::futures::stream::StreamExt::then(self.#ident(#ctx_param #(#use_params),*).await.fuse(), move |msg| {
|
||||
let stream = #crate_name::futures::stream::StreamExt::then(#create_field_stream.fuse(), move |msg| {
|
||||
let environment = environment.clone();
|
||||
let field_selection_set = field_selection_set.clone();
|
||||
let schema = schema.clone();
|
||||
|
|
|
@ -196,6 +196,68 @@ pub async fn test_subscription_with_ctx_data() {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
pub async fn test_subscription_with_token() {
|
||||
struct QueryRoot;
|
||||
|
||||
#[Object]
|
||||
impl QueryRoot {}
|
||||
|
||||
struct SubscriptionRoot;
|
||||
|
||||
struct Token(String);
|
||||
|
||||
#[Subscription]
|
||||
impl SubscriptionRoot {
|
||||
#[field]
|
||||
async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> {
|
||||
if ctx.data::<Token>().0 != "123456" {
|
||||
return Err("forbidden".into());
|
||||
}
|
||||
Ok(futures::stream::once(async move { 100 }))
|
||||
}
|
||||
}
|
||||
|
||||
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
||||
|
||||
{
|
||||
let mut stream = schema
|
||||
.create_subscription_stream(
|
||||
"subscription { values }",
|
||||
None,
|
||||
Default::default(),
|
||||
Some(Arc::new({
|
||||
let mut data = Data::default();
|
||||
data.insert(Token("123456".to_string()));
|
||||
data
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
Some(serde_json::json!({ "values": 100 })),
|
||||
stream.next().await
|
||||
);
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
|
||||
{
|
||||
assert!(schema
|
||||
.create_subscription_stream(
|
||||
"subscription { values }",
|
||||
None,
|
||||
Default::default(),
|
||||
Some(Arc::new({
|
||||
let mut data = Data::default();
|
||||
data.insert(Token("654321".to_string()));
|
||||
data
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
pub async fn test_subscription_ws_transport() {
|
||||
struct QueryRoot;
|
||||
|
@ -266,3 +328,78 @@ pub async fn test_subscription_ws_transport() {
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
pub async fn test_subscription_ws_transport_with_token() {
|
||||
struct Token(String);
|
||||
|
||||
struct QueryRoot;
|
||||
|
||||
#[Object]
|
||||
impl QueryRoot {}
|
||||
|
||||
struct SubscriptionRoot;
|
||||
|
||||
#[Subscription]
|
||||
impl SubscriptionRoot {
|
||||
#[field]
|
||||
async fn values(&self, ctx: &Context<'_>) -> FieldResult<impl Stream<Item = i32>> {
|
||||
if ctx.data::<Token>().0 != "123456" {
|
||||
return Err("forbidden".into());
|
||||
}
|
||||
Ok(futures::stream::iter(0..10))
|
||||
}
|
||||
}
|
||||
|
||||
let schema = Schema::new(QueryRoot, EmptyMutation, SubscriptionRoot);
|
||||
let (mut sink, mut stream) = schema.subscription_connection(
|
||||
WebSocketTransport::default(),
|
||||
Some(Arc::new({
|
||||
let mut data = Data::default();
|
||||
data.insert(Token("123456".to_string()));
|
||||
data
|
||||
})),
|
||||
);
|
||||
|
||||
sink.send(
|
||||
serde_json::to_vec(&serde_json::json!({
|
||||
"type": "connection_init",
|
||||
}))
|
||||
.unwrap()
|
||||
.into(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
Some(serde_json::json!({
|
||||
"type": "connection_ack",
|
||||
})),
|
||||
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
|
||||
);
|
||||
|
||||
sink.send(
|
||||
serde_json::to_vec(&serde_json::json!({
|
||||
"type": "start",
|
||||
"id": "1",
|
||||
"payload": {
|
||||
"query": "subscription { values }"
|
||||
},
|
||||
}))
|
||||
.unwrap()
|
||||
.into(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for i in 0..10 {
|
||||
assert_eq!(
|
||||
Some(serde_json::json!({
|
||||
"type": "data",
|
||||
"id": "1",
|
||||
"payload": { "data": { "values": i } },
|
||||
})),
|
||||
serde_json::from_slice(&stream.next().await.unwrap()).unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user