pub mod config; pub mod duration_tools; pub mod rhai_tools; pub mod task; pub mod twitch; pub mod twitch_irc; pub mod user_config; pub mod web; use anyhow::Result; use cached::{TimedCache, Cached}; use chrono::Duration; use futures::{StreamExt, SinkExt}; use irc::client::prelude::Client as IrcClient; use rhai::Engine; use serde::{Deserialize, Serialize}; use tokio::{ runtime::Handle, sync::{ Mutex, RwLock, mpsc::UnboundedSender, }, }; use tokio_tungstenite::tungstenite::Message as WsMessage; use twitch_api2::{ TwitchClient, helix::{ Scope, users::User, }, pubsub::Topic, types::{UserId, UserName}, twitch_oauth2::{ClientSecret, UserToken, TwitchToken, ClientId, RefreshToken, AccessToken}, }; use crate::app::{ config::Config, rhai_tools::{ExecutorState, ExecutorOutput}, user_config::UserConfig, twitch::Twitch, }; use std::{ collections::HashMap, ops::{Deref, DerefMut}, path::Path, sync::{ Arc, atomic::{AtomicBool, Ordering}, }, time::Instant, }; pub struct State { pub user_config: UserConfig, pub config: Arc>, pub channel_name: String, pub twitch: Arc, pub live: AtomicBool, pub rewards_paused: RwLock>, pub user_login_cache: Mutex>, pub user_id_cache: Mutex>, pub irc: RwLock, pub irc_queue: UnboundedSender, pub user_cooldowns: RwLock>, pub global_cooldowns: RwLock>, pub rhai: Engine, pub script_cache: parking_lot::RwLock>, pub runtime: Handle, pub speedgame: RwLock<(Option, Option)>, } impl State { #[allow(unreachable_code)] pub async fn new(runtime: Handle, user_config: UserConfig, mut config: Config) -> Result> { let http_client = reqwest::Client::new(); println!("verifying bot token"); verify_token(&mut config.bot_token, &user_config, &http_client, vec![ // IRC Scope::ChatRead, Scope::ChatEdit, // Mod stuff Scope::ChannelModerate, Scope::ModerationRead, Scope::ModeratorManageAutoMod, ]).await?; println!("bot token ready"); println!("verifying user token"); verify_token(&mut config.user_token, &user_config, &http_client, vec![ // Channel points redemptions Scope::ChannelReadRedemptions, Scope::ChannelManageRedemptions, // Mod stuff Scope::ChannelModerate, Scope::ModerationRead, Scope::ModeratorManageAutoMod, ]).await?; println!("user token ready"); let config = Arc::new(RwLock::new(config)); let twitch_client = TwitchClient::with_client(http_client); let twitch_config = Arc::clone(&config); let twitch = Arc::new(Twitch::new(twitch_client, twitch_config)); println!("getting channel name"); let user_id = UserId::new(user_config.twitch.channel_id.to_string()); let channel_name = twitch.client.helix.get_user_from_id(user_id, &twitch.bot_token().await?) .await? .ok_or_else(|| anyhow::anyhow!("no channel for id {}", user_config.twitch.channel_id))? .login .to_string(); println!("configured to run for channel: {}", channel_name); println!("starting periodic config save task"); let task_config = Arc::clone(&config); tokio::task::spawn(async move { loop { println!("saving config"); if let Err(e) = crate::save_config(Path::new("data.json"), &*task_config.read().await).await { eprintln!("could not save config: {:?}", e); } tokio::time::sleep(Duration::minutes(5).to_std().unwrap()).await; } }); println!("connecting to irc"); let mut irc = self::twitch_irc::connect(&twitch).await?; let mut irc_stream = irc.stream()?; let mut rhai = Engine::new(); rhai.set_max_expr_depths(0, 0); rhai.register_type::() .register_get("args", ExecutorState::args) .register_get("initiator", ExecutorState::initiator) .register_get("initiator_id", ExecutorState::initiator_id) .register_get("broadcaster", ExecutorState::broadcaster) .register_get("moderator", ExecutorState::moderator) .register_get("vip", ExecutorState::vip) .register_get("subscriber", ExecutorState::subscriber) .register_get("speedgame_name", ExecutorState::speedgame_name) .register_get("speedgame_category", ExecutorState::speedgame_category) .register_fn("get_username", ExecutorState::get_username::<&str>) .register_fn("get_user_id", ExecutorState::get_user_id::<&str>) .register_fn("get_channel_info", ExecutorState::get_channel_info::<&str>); rhai.register_type::() .register_fn("send", ExecutorOutput::send::<&str>); let (queue_tx, mut queue_rx) = tokio::sync::mpsc::unbounded_channel(); let state = Arc::new(Self { user_config, config, channel_name, twitch, live: AtomicBool::new(false), rewards_paused: Default::default(), user_login_cache: Mutex::new(TimedCache::with_lifespan(3600)), user_id_cache: Mutex::new(TimedCache::with_lifespan(3600)), irc: RwLock::new(irc), irc_queue: queue_tx, user_cooldowns: Default::default(), global_cooldowns: Default::default(), rhai, script_cache: Default::default(), runtime, speedgame: RwLock::new((None, None)), }); // start web task tokio::task::spawn(crate::app::web::start_web(Arc::clone(&state))); // start pubsub let redemption_topic = twitch_api2::pubsub::channel_points::ChannelPointsChannelV1 { channel_id: state.user_config.twitch.channel_id as u32, }.into_topic(); let video_playback_topic = twitch_api2::pubsub::video_playback::VideoPlaybackById { channel_id: state.user_config.twitch.channel_id as u32, }.into_topic(); let auth_token: Option = state.config .read() .await .user_token .as_ref() .map(|token| token.access_token.secret().to_string()); let listen_command = twitch_api2::pubsub::listen_command( &[redemption_topic, video_playback_topic], auth_token.as_deref(), "1", )?; let task_state = Arc::clone(&state); tokio::task::spawn(async move { println!("starting pubsub listener"); let res: Result<()> = try { let (ws, _) = tokio_tungstenite::connect_async("wss://pubsub-edge.twitch.tv").await?; let (mut write, mut read) = ws.split(); write.send(WsMessage::Text(listen_command)).await?; let mut ping = tokio::time::interval(chrono::Duration::minutes(2).to_std().unwrap()); loop { tokio::select! { _ = ping.tick() => { write.send(WsMessage::Ping(vec![1, 2, 3, 4])).await?; write.send(WsMessage::Text(r#"{"type":"PING"}"#.into())).await?; }, message = read.next() => { if let Some(Ok(message)) = message { if let Err(e) = crate::app::twitch::handle_pubsub(Arc::clone(&task_state), message).await { eprintln!("error in pubsub: {:?}", e); } } }, } } }; if let Err(e) = res { eprintln!("error connecting to websocket: {:?}", e); } }); // start irc task let task_state = Arc::clone(&state); tokio::task::spawn(async move { println!("listening for irc events"); loop { while let Some(event) = irc_stream.next().await.transpose().unwrap_or_default() { let task_state = Arc::clone(&task_state); if let Err(e) = crate::app::twitch::handle_irc_event(task_state, event).await { eprintln!("irc error: {:?}", e); } } // reconnect eprintln!("reconnecting to irc... "); match self::twitch_irc::connect(&*task_state.twitch).await { Ok(mut client) => { eprintln!("connected to irc"); if let Ok(stream) = client.stream() { irc_stream = stream; } else { eprintln!("connected but no stream?"); } *task_state.irc.write().await = client; } Err(e) => { eprintln!("error reconnecting to irc: {:?}", e); } } } }); // start irc message queue let task_state = Arc::clone(&state); tokio::task::spawn(async move { let channel = format!("#{}", task_state.channel_name); while let Some(message) = queue_rx.recv().await { if let Err(e) = task_state.irc.read().await.send_privmsg(&channel, message) { eprintln!("error sending message: {:?}", e); } } }); // start stream status task let task_state = Arc::clone(&state); self::task::stream_status::start_task(task_state).await; // start token refresh task let task_state = Arc::clone(&state); self::task::tokens::start_task(task_state).await; println!("initialised"); Ok(state) } pub async fn get_user_from_id>(&self, id: S) -> anyhow::Result> { let id = id.into(); if let Some(user) = self.user_id_cache.lock().await.cache_get(&id) { return Ok(Some(user.clone())); } let user = self.twitch.client.helix.get_user_from_id(id, &self.twitch.bot_token().await?).await?; self.add_user_to_cache(&user).await; Ok(user) } pub async fn get_user_from_login>(&self, login: S) -> anyhow::Result> { let login = login.into(); if let Some(user) = self.user_login_cache.lock().await.cache_get(&login) { return Ok(Some(user.clone())); } let user = self.twitch.client.helix.get_user_from_login(login, &self.twitch.bot_token().await?).await?; self.add_user_to_cache(&user).await; Ok(user) } async fn add_user_to_cache(&self, user: &Option) { if let Some(user) = user { self.user_id_cache.lock().await.cache_set(user.id.clone(), user.clone()); self.user_login_cache.lock().await.cache_set(user.login.clone(), user.clone()); } } async fn store_live_status(self: &Arc, live: bool) { let prev = self.live.swap(live, Ordering::SeqCst); if prev == live { return; } if let Some(rhai) = &self.config.read().await.events.stream_status { self::run_rhai(Arc::clone(self), rhai, vec![rhai::Dynamic::from(live)]); } } } #[derive(Clone)] pub struct FullUserToken { pub token: UserToken, pub client_id: ClientId, pub client_secret: ClientSecret, } impl<'de> Deserialize<'de> for FullUserToken { fn deserialize(de: D) -> std::result::Result where D: serde::de::Deserializer<'de>, { let (refresh, access, login, user_id, client_id, client_secret, scopes): (RefreshToken, AccessToken, _, _, ClientId, ClientSecret, Vec) = Deserialize::deserialize(de)?; let token = UserToken::from_existing_unchecked(access, refresh, client_id.clone(), client_secret.clone(), login, user_id, Some(scopes), None); Ok(FullUserToken { token, client_id, client_secret, }) } } impl Serialize for FullUserToken { fn serialize(&self, ser: S) -> std::result::Result where S: serde::ser::Serializer, { let to_ser = (&self.token.refresh_token, &self.token.access_token, &self.token.login, &self.token.user_id, &self.client_id, &self.client_secret, self.token.scopes()); to_ser.serialize(ser) } } impl Deref for FullUserToken { type Target = UserToken; fn deref(&self) -> &Self::Target { &self.token } } impl DerefMut for FullUserToken { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.token } } pub fn run_rhai>(state: Arc, script: S, fn_args: Vec) { let script = script.into(); tokio::task::spawn_blocking(move || { let res: anyhow::Result<()> = try { let mut scope = rhai::Scope::new(); let ast = state.script_cache.read().get(&script).map(ToOwned::to_owned); let ast = match ast { Some(ast) => ast, None => { let ast = state.rhai.compile(&script)?; state.script_cache.write().insert(script.clone(), ast.clone()); ast } }; let mut output = rhai::Dynamic::from(ExecutorOutput::default()); state.rhai.call_fn_raw( &mut scope, &ast, true, false, "run", Some(&mut output), fn_args, )?; let output: ExecutorOutput = output.cast(); for message in output.to_send { state.irc_queue.send(message).ok(); } }; if let Err(e) = res { eprintln!("error in rhai script: {:?}", e); } }); } async fn verify_token(full_token: &mut Option, user_config: &UserConfig, http_client: &reqwest::Client, scopes: Vec) -> anyhow::Result<()> { let fut = match full_token.take() { Some(mut t) => { match t.validate_token(http_client).await { Ok(validated) => { let old = t.token; let token = UserToken::from_existing_unchecked( old.access_token, old.refresh_token, validated.client_id, t.client_secret.clone(), validated.login.unwrap_or(old.login), validated.user_id.unwrap_or(old.user_id), validated.scopes, Some(validated.expires_in), ); FullUserToken { client_id: t.client_id, client_secret: t.client_secret, token, } }, Err(_) => { println!("Refreshing token"); t.refresh_token(http_client).await?; t }, } }, None => { let mut builder = UserToken::builder( user_config.twitch.client_id.clone(), user_config.twitch.client_secret.clone(), url::Url::parse("http://localhost/").unwrap(), ).set_scopes(scopes); let (url, csrf) = builder.generate_url(); println!("go to {}", url); println!("once done, paste code:"); let mut code = String::new(); std::io::stdin().read_line(&mut code)?; let token: UserToken = builder.get_user_token(http_client, csrf.as_str(), code.trim()).await?; FullUserToken { token, client_id: user_config.twitch.client_id.clone(), client_secret: user_config.twitch.client_secret.clone(), } } }; *full_token = Some(fut); Ok(()) }