diff --git a/server/Cargo.lock b/server/Cargo.lock index 8d580d9..c79f12c 100755 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -336,6 +336,7 @@ dependencies = [ "prefixed-api-key", "rand 0.8.5", "regex", + "reqwest", "rmp-serde", "rustyline", "serde", @@ -346,6 +347,7 @@ dependencies = [ "tokio", "tokio-tungstenite", "toml", + "url", "uuid", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index bd6252e..f707ca7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -18,6 +18,7 @@ parking_lot = "0.12" prefixed-api-key = { git = "https://git.annaclemens.io/ascclemens/prefixed-api-key.git" } rand = "0.8" regex = "1" +reqwest = { version = "0.11", default-features = false } rmp-serde = "1" rustyline = { version = "9", default-features = false } serde = { version = "1", features = ["derive"] } @@ -28,6 +29,7 @@ sha3 = "0.10" sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "sqlite", "chrono"] } tokio-tungstenite = "0.17" toml = "0.5" +url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["serde", "v4"] } [dependencies.tokio] diff --git a/server/src/influx.rs b/server/src/influx.rs new file mode 100644 index 0000000..872491e --- /dev/null +++ b/server/src/influx.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use chrono::Utc; +use log::{debug, error}; +use reqwest::Client; +use tokio::sync::RwLock; + +use crate::{Config, State}; + +pub fn spawn(config: &Config, state: Arc>) { + let influx = match &config.influx { + Some(i) => i, + None => return, + }; + + let mut url = match influx.url.join("/api/v2/write") { + Ok(url) => url, + Err(e) => { + error!("Failed to parse influxdb url: {}", e); + return; + } + }; + + url.query_pairs_mut() + .append_pair("org", &influx.org) + .append_pair("bucket", &influx.bucket); + + let influx_token = influx.token.clone(); + + tokio::task::spawn(async move { + let mut last_messages = 0; + + let client = Client::new(); + + loop { + let messages = state.read().await.messages_sent.load(Ordering::SeqCst); + let diff = messages - last_messages; + last_messages = messages; + + let clients = state.read().await.clients.len(); + + let timestamp = Utc::now().timestamp_nanos(); + + let line_format = format!( + "logged_in value={logged_in}u {timestamp}\nmessages_this_instance value={messages_this_instance}u {timestamp}\nmessages_new value={messages_new}u {timestamp}\n", + logged_in = clients, + messages_this_instance = messages, + messages_new = diff, + timestamp = timestamp, + ); + + debug!("line_format: {}", line_format); + + let res = client.post(url.clone()) + .header("Authorization", format!("Token {}", influx_token)) + .body(line_format) + .send() + .await + .and_then(|resp| resp.error_for_status()); + + if let Err(e) = res { + error!("failed to send to influxdb: {}", e); + } else { + debug!("sent to influxdb"); + } + + tokio::time::sleep(Duration::from_secs(60)).await; + } + }); +} diff --git a/server/src/main.rs b/server/src/main.rs index bfb9a3f..06f0729 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -49,6 +49,7 @@ pub mod handlers; pub mod util; pub mod updater; pub mod logging; +pub mod influx; pub type WsStream = WebSocketStream; @@ -188,17 +189,28 @@ async fn main() -> Result<()> { { let state = Arc::clone(&state); tokio::task::spawn(async move { + let mut last_messages = 0; + loop { + let messages = state.read().await.messages_sent.load(Ordering::SeqCst); + let diff = messages - last_messages; + last_messages = messages; + + let clients = state.read().await.clients.len(); + info!( - "Clients: {}, messages sent: {}", - state.read().await.clients.len(), - state.read().await.messages_sent.load(Ordering::SeqCst), + "Clients: {}, messages sent: {} (+{})", + clients, + messages, + diff, ); tokio::time::sleep(Duration::from_secs(60)).await; } }); } + influx::spawn(&config, Arc::clone(&state)); + updater::spawn(Arc::clone(&state)); loop { diff --git a/server/src/types/config.rs b/server/src/types/config.rs index 8277995..4703d97 100644 --- a/server/src/types/config.rs +++ b/server/src/types/config.rs @@ -1,9 +1,12 @@ use serde::{Deserialize, Serialize}; +use url::Url; #[derive(Debug, Deserialize, Serialize)] pub struct Config { pub server: Server, pub database: Database, + #[serde(default)] + pub influx: Option, } #[derive(Debug, Deserialize, Serialize)] @@ -15,3 +18,11 @@ pub struct Server { pub struct Database { pub path: String, } + +#[derive(Debug, Deserialize, Serialize)] +pub struct Influx { + pub url: Url, + pub org: String, + pub bucket: String, + pub token: String, +}