Add tracing to dataloader methods when the tracing feature is enabled.

This commit is contained in:
Doug Roeper 2022-07-28 08:11:58 -04:00
parent b33f8658cc
commit 9649c43bd7
No known key found for this signature in database
GPG Key ID: 87038190C453E8B3
2 changed files with 20 additions and 6 deletions

View File

@ -82,7 +82,7 @@ tracing-futures = { version = "0.2.5", optional = true, features = [
"std-future", "std-future",
"futures-03", "futures-03",
] } ] }
tracinglib = { version = "0.1.25", optional = true, package = "tracing" } tracinglib = { version = "0.1.35", optional = true, package = "tracing" }
url = { version = "2.2.1", optional = true } url = { version = "2.2.1", optional = true }
uuid = { version = "1.0.0", optional = true, features = ["v4", "serde"] } uuid = { version = "1.0.0", optional = true, features = ["v4", "serde"] }
uuid08 = { version = "0.8", package = "uuid", optional = true, features = [ uuid08 = { version = "0.8", package = "uuid", optional = true, features = [

View File

@ -75,6 +75,10 @@ use fnv::FnvHashMap;
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_timer::Delay; use futures_timer::Delay;
use futures_util::future::BoxFuture; use futures_util::future::BoxFuture;
#[cfg(feature = "tracing")]
use tracinglib as tracing;
#[cfg(feature = "tracing")]
use tracing::{instrument, info_span, Instrument};
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
struct ResSender<K: Send + Sync + Hash + Eq + Clone + 'static, T: Loader<K>> { struct ResSender<K: Send + Sync + Hash + Eq + Clone + 'static, T: Loader<K>> {
@ -128,6 +132,7 @@ struct DataLoaderInner<T> {
} }
impl<T> DataLoaderInner<T> { impl<T> DataLoaderInner<T> {
#[cfg_attr(feature = "tracing", instrument(skip_all))]
async fn do_load<K>(&self, disable_cache: bool, (keys, senders): KeysAndSender<K, T>) async fn do_load<K>(&self, disable_cache: bool, (keys, senders): KeysAndSender<K, T>)
where where
K: Send + Sync + Hash + Eq + Clone + 'static, K: Send + Sync + Hash + Eq + Clone + 'static,
@ -275,6 +280,7 @@ impl<T, C: CacheFactory> DataLoader<T, C> {
} }
/// Use this `DataLoader` load a data. /// Use this `DataLoader` load a data.
#[cfg_attr(feature = "tracing", instrument(skip_all))]
pub async fn load_one<K>(&self, key: K) -> Result<Option<T::Value>, T::Error> pub async fn load_one<K>(&self, key: K) -> Result<Option<T::Value>, T::Error>
where where
K: Send + Sync + Hash + Eq + Clone + 'static, K: Send + Sync + Hash + Eq + Clone + 'static,
@ -285,6 +291,7 @@ impl<T, C: CacheFactory> DataLoader<T, C> {
} }
/// Use this `DataLoader` to load some data. /// Use this `DataLoader` to load some data.
#[cfg_attr(feature = "tracing", instrument(skip_all))]
pub async fn load_many<K, I>(&self, keys: I) -> Result<HashMap<K, T::Value>, T::Error> pub async fn load_many<K, I>(&self, keys: I) -> Result<HashMap<K, T::Value>, T::Error>
where where
K: Send + Sync + Hash + Eq + Clone + 'static, K: Send + Sync + Hash + Eq + Clone + 'static,
@ -357,16 +364,17 @@ impl<T, C: CacheFactory> DataLoader<T, C> {
Action::ImmediateLoad(keys) => { Action::ImmediateLoad(keys) => {
let inner = self.inner.clone(); let inner = self.inner.clone();
let disable_cache = self.disable_cache.load(Ordering::SeqCst); let disable_cache = self.disable_cache.load(Ordering::SeqCst);
(self.spawner)(Box::pin( let task = async move { inner.do_load(disable_cache, keys).await };
async move { inner.do_load(disable_cache, keys).await }, #[cfg(feature = "tracing")]
)); let task = task.instrument(info_span!("immediate_load"));
(self.spawner)(Box::pin(task));
} }
Action::StartFetch => { Action::StartFetch => {
let inner = self.inner.clone(); let inner = self.inner.clone();
let disable_cache = self.disable_cache.load(Ordering::SeqCst); let disable_cache = self.disable_cache.load(Ordering::SeqCst);
let delay = self.delay; let delay = self.delay;
(self.spawner)(Box::pin(async move { let task = async move {
Delay::new(delay).await; Delay::new(delay).await;
let keys = { let keys = {
@ -382,7 +390,10 @@ impl<T, C: CacheFactory> DataLoader<T, C> {
if !keys.0.is_empty() { if !keys.0.is_empty() {
inner.do_load(disable_cache, keys).await inner.do_load(disable_cache, keys).await
} }
})) };
#[cfg(feature = "tracing")]
let task = task.instrument(info_span!("start_fetch"));
(self.spawner)(Box::pin(task))
} }
Action::Delay => {} Action::Delay => {}
} }
@ -394,6 +405,7 @@ impl<T, C: CacheFactory> DataLoader<T, C> {
/// ///
/// **NOTE: If the cache type is [NoCache], this function will not take /// **NOTE: If the cache type is [NoCache], this function will not take
/// effect. ** /// effect. **
#[cfg_attr(feature = "tracing", instrument(skip_all))]
pub async fn feed_many<K, I>(&self, values: I) pub async fn feed_many<K, I>(&self, values: I)
where where
K: Send + Sync + Hash + Eq + Clone + 'static, K: Send + Sync + Hash + Eq + Clone + 'static,
@ -418,6 +430,7 @@ impl<T, C: CacheFactory> DataLoader<T, C> {
/// ///
/// **NOTE: If the cache type is [NoCache], this function will not take /// **NOTE: If the cache type is [NoCache], this function will not take
/// effect. ** /// effect. **
#[cfg_attr(feature = "tracing", instrument(skip_all))]
pub async fn feed_one<K>(&self, key: K, value: T::Value) pub async fn feed_one<K>(&self, key: K, value: T::Value)
where where
K: Send + Sync + Hash + Eq + Clone + 'static, K: Send + Sync + Hash + Eq + Clone + 'static,
@ -430,6 +443,7 @@ impl<T, C: CacheFactory> DataLoader<T, C> {
/// ///
/// **NOTE: If the cache type is [NoCache], this function will not take /// **NOTE: If the cache type is [NoCache], this function will not take
/// effect. ** /// effect. **
#[cfg_attr(feature = "tracing", instrument(skip_all))]
pub fn clear<K>(&self) pub fn clear<K>(&self)
where where
K: Send + Sync + Hash + Eq + Clone + 'static, K: Send + Sync + Hash + Eq + Clone + 'static,