feat: add influxdb reporting

This commit is contained in:
Anna 2022-07-14 13:40:43 -04:00
parent c1501222fe
commit 83af7c4062
5 changed files with 102 additions and 3 deletions

2
server/Cargo.lock generated
View File

@ -336,6 +336,7 @@ dependencies = [
"prefixed-api-key",
"rand 0.8.5",
"regex",
"reqwest",
"rmp-serde",
"rustyline",
"serde",
@ -346,6 +347,7 @@ dependencies = [
"tokio",
"tokio-tungstenite",
"toml",
"url",
"uuid",
]

View File

@ -18,6 +18,7 @@ parking_lot = "0.12"
prefixed-api-key = { git = "https://git.annaclemens.io/ascclemens/prefixed-api-key.git" }
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", default-features = false }
rmp-serde = "1"
rustyline = { version = "9", default-features = false }
serde = { version = "1", features = ["derive"] }
@ -28,6 +29,7 @@ sha3 = "0.10"
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "sqlite", "chrono"] }
tokio-tungstenite = "0.17"
toml = "0.5"
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4"] }
[dependencies.tokio]

72
server/src/influx.rs Normal file
View File

@ -0,0 +1,72 @@
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use chrono::Utc;
use log::{debug, error};
use reqwest::Client;
use tokio::sync::RwLock;
use crate::{Config, State};
pub fn spawn(config: &Config, state: Arc<RwLock<State>>) {
let influx = match &config.influx {
Some(i) => i,
None => return,
};
let mut url = match influx.url.join("/api/v2/write") {
Ok(url) => url,
Err(e) => {
error!("Failed to parse influxdb url: {}", e);
return;
}
};
url.query_pairs_mut()
.append_pair("org", &influx.org)
.append_pair("bucket", &influx.bucket);
let influx_token = influx.token.clone();
tokio::task::spawn(async move {
let mut last_messages = 0;
let client = Client::new();
loop {
let messages = state.read().await.messages_sent.load(Ordering::SeqCst);
let diff = messages - last_messages;
last_messages = messages;
let clients = state.read().await.clients.len();
let timestamp = Utc::now().timestamp_nanos();
let line_format = format!(
"logged_in value={logged_in}u {timestamp}\nmessages_this_instance value={messages_this_instance}u {timestamp}\nmessages_new value={messages_new}u {timestamp}\n",
logged_in = clients,
messages_this_instance = messages,
messages_new = diff,
timestamp = timestamp,
);
debug!("line_format: {}", line_format);
let res = client.post(url.clone())
.header("Authorization", format!("Token {}", influx_token))
.body(line_format)
.send()
.await
.and_then(|resp| resp.error_for_status());
if let Err(e) = res {
error!("failed to send to influxdb: {}", e);
} else {
debug!("sent to influxdb");
}
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}

View File

@ -49,6 +49,7 @@ pub mod handlers;
pub mod util;
pub mod updater;
pub mod logging;
pub mod influx;
pub type WsStream = WebSocketStream<TcpStream>;
@ -188,17 +189,28 @@ async fn main() -> Result<()> {
{
let state = Arc::clone(&state);
tokio::task::spawn(async move {
let mut last_messages = 0;
loop {
let messages = state.read().await.messages_sent.load(Ordering::SeqCst);
let diff = messages - last_messages;
last_messages = messages;
let clients = state.read().await.clients.len();
info!(
"Clients: {}, messages sent: {}",
state.read().await.clients.len(),
state.read().await.messages_sent.load(Ordering::SeqCst),
"Clients: {}, messages sent: {} (+{})",
clients,
messages,
diff,
);
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
influx::spawn(&config, Arc::clone(&state));
updater::spawn(Arc::clone(&state));
loop {

View File

@ -1,9 +1,12 @@
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
pub server: Server,
pub database: Database,
#[serde(default)]
pub influx: Option<Influx>,
}
#[derive(Debug, Deserialize, Serialize)]
@ -15,3 +18,11 @@ pub struct Server {
pub struct Database {
pub path: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Influx {
pub url: Url,
pub org: String,
pub bucket: String,
pub token: String,
}