From de7319009214df60fabc52a5a43913bd169b11be Mon Sep 17 00:00:00 2001 From: Anna Date: Tue, 31 Aug 2021 15:45:22 -0400 Subject: [PATCH] feat: add stream status events Add IRC reconnection. Add token refreshing. --- Cargo.toml | 2 +- src/app.rs | 146 +++++++++++++++++++++---------- src/app/config.rs | 7 ++ src/app/rhai_tools.rs | 2 +- src/app/task/mod.rs | 1 + src/app/task/stream_status.rs | 40 +++++++++ src/app/twitch.rs | 69 +++++++++++++-- src/app/twitch_irc.rs | 22 +++++ src/app/web.rs | 1 + src/app/web/route/events.rs | 67 ++++++++++++++ src/app/web/route/livesplit.rs | 2 +- src/app/web/route/mod.rs | 2 + src/app/web/route/redemptions.rs | 6 +- src/app/web/template/events.rs | 8 ++ src/app/web/template/mod.rs | 1 + templates/events.html | 43 +++++++++ 16 files changed, 362 insertions(+), 57 deletions(-) create mode 100644 src/app/task/mod.rs create mode 100644 src/app/task/stream_status.rs create mode 100644 src/app/twitch_irc.rs create mode 100644 src/app/web/route/events.rs create mode 100644 src/app/web/template/events.rs create mode 100644 templates/events.html diff --git a/Cargo.toml b/Cargo.toml index abd380d..77a1339 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ serde_json = "1" serde_with = { version = "1", features = ["chrono"] } toml = "0.5" tokio-tungstenite = { version = "0.15", features = ["rustls-tls"] } -twitch_api2 = { version = "0.6.0-rc.2", features = ["twitch_oauth2", "client", "reqwest_client", "helix", "pubsub"] } +twitch_api2 = { version = "0.6.0-rc.2", features = ["twitch_oauth2", "client", "reqwest_client", "helix", "pubsub", "unsupported"] } url = "2" uuid = { version = "0.8", features = ["v4", "serde"] } warp = "0.3" diff --git a/src/app.rs b/src/app.rs index d585146..846e19d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,14 +1,16 @@ pub mod config; pub mod duration_tools; pub mod rhai_tools; +pub mod task; pub mod twitch; +pub mod twitch_irc; pub mod user_config; pub mod web; use anyhow::Result; use cached::{TimedCache, Cached}; use futures::{StreamExt, SinkExt}; -use irc::client::prelude::{Client as IrcClient, Config as IrcConfig, Capability}; +use irc::client::prelude::Client as IrcClient; use rhai::Engine; use serde::{Deserialize, Serialize}; use tokio::{ @@ -38,22 +40,27 @@ use crate::app::{ use std::{ collections::HashMap, ops::{Deref, DerefMut}, - sync::Arc, + sync::{ + Arc, + atomic::AtomicBool, + }, time::Instant, }; +use std::sync::atomic::Ordering; pub struct State { pub user_config: UserConfig, - pub config: RwLock, + pub config: Arc>, pub channel_name: String, pub twitch: Arc, + pub live: AtomicBool, pub rewards_paused: RwLock>, pub user_login_cache: Mutex>, pub user_id_cache: Mutex>, - pub irc: IrcClient, + pub irc: RwLock, pub irc_queue: UnboundedSender, pub user_cooldowns: RwLock>, pub global_cooldowns: RwLock>, @@ -91,34 +98,20 @@ impl State { ]).await?; println!("User token ready"); + let config = Arc::new(RwLock::new(config)); + let twitch_client = TwitchClient::with_client(http_client); - let twitch = Arc::new(Twitch { - client: twitch_client, - bot_token: config.bot_token.clone().unwrap().token, - user_token: config.user_token.clone().unwrap().token, - }); + let twitch_config = Arc::clone(&config); + let twitch = Arc::new(Twitch::new(twitch_client, twitch_config)); let user_id = UserId::new(user_config.twitch.channel_id.to_string()); - let channel_name = twitch.client.helix.get_user_from_id(user_id, &twitch.bot_token) + let channel_name = twitch.client.helix.get_user_from_id(user_id, &twitch.bot_token().await?) .await? .ok_or_else(|| anyhow::anyhow!("no channel for id {}", user_config.twitch.channel_id))? .login .to_string(); - let irc_config = IrcConfig { - server: Some("irc.chat.twitch.tv".into()), - port: Some(6697), - username: Some(twitch.bot_token.login.clone()), - nickname: Some(twitch.bot_token.login.clone()), - password: Some(format!("oauth:{}", twitch.bot_token.access_token.secret())), - ..Default::default() - }; - let mut irc = IrcClient::from_config(irc_config).await?; - irc.send_cap_req(&[ - Capability::Custom("twitch.tv/tags"), - Capability::Custom("twitch.tv/commands"), - ])?; - irc.identify()?; + let mut irc = self::twitch_irc::connect(&twitch).await?; let mut irc_stream = irc.stream()?; let mut rhai = Engine::new(); @@ -143,16 +136,17 @@ impl State { let state = Arc::new(Self { user_config, - config: RwLock::new(config), + config, channel_name, twitch, + live: AtomicBool::new(false), rewards_paused: Default::default(), user_login_cache: Mutex::new(TimedCache::with_lifespan(3600)), user_id_cache: Mutex::new(TimedCache::with_lifespan(3600)), - irc, + irc: RwLock::new(irc), irc_queue: queue_tx, user_cooldowns: Default::default(), global_cooldowns: Default::default(), @@ -171,6 +165,10 @@ impl State { let redemption_topic = twitch_api2::pubsub::channel_points::ChannelPointsChannelV1 { channel_id: state.user_config.twitch.channel_id as u32, }.into_topic(); + let video_playback_topic = twitch_api2::pubsub::video_playback::VideoPlaybackById { + channel_id: state.user_config.twitch.channel_id as u32, + }.into_topic(); + let auth_token: Option = state.config .read() .await @@ -178,7 +176,7 @@ impl State { .as_ref() .map(|token| token.access_token.secret().to_string()); let listen_command = twitch_api2::pubsub::listen_command( - &[redemption_topic], + &[redemption_topic, video_playback_topic], auth_token.as_deref(), "1", )?; @@ -216,14 +214,33 @@ impl State { let task_state = Arc::clone(&state); tokio::task::spawn(async move { // FIXME: handle reconnects - while let Some(event) = irc_stream.next().await.transpose()? { - let task_state = Arc::clone(&task_state); - if let Err(e) = crate::app::twitch::handle_irc_event(task_state, event).await { - eprintln!("irc error: {:?}", e); + loop { + while let Some(event) = irc_stream.next().await.transpose().unwrap_or_default() { + let task_state = Arc::clone(&task_state); + if let Err(e) = crate::app::twitch::handle_irc_event(task_state, event).await { + eprintln!("irc error: {:?}", e); + } } - } - Result::<(), anyhow::Error>::Ok(()) + // reconnect + eprintln!("reconnecting to irc... "); + match self::twitch_irc::connect(&*task_state.twitch).await { + Ok(mut client) => { + eprintln!("connected to irc"); + + if let Ok(stream) = client.stream() { + eprintln!("connected but no stream?"); + irc_stream = stream; + } + + *task_state.irc.write().await = client; + } + Err(e) => { + eprintln!("error reconnecting to irc: {:?}", e); + } + } + + } }); // start irc message queue @@ -232,12 +249,16 @@ impl State { let channel = format!("#{}", task_state.channel_name); while let Some(message) = queue_rx.recv().await { - if let Err(e) = task_state.irc.send_privmsg(&channel, message) { + if let Err(e) = task_state.irc.read().await.send_privmsg(&channel, message) { eprintln!("error sending message: {:?}", e); } } }); + // start stream status task + let task_state = Arc::clone(&state); + self::task::stream_status::start_task(task_state).await; + Ok(state) } @@ -247,7 +268,7 @@ impl State { return Ok(Some(user.clone())); } - let user = self.twitch.client.helix.get_user_from_id(id, &self.twitch.bot_token).await?; + let user = self.twitch.client.helix.get_user_from_id(id, &self.twitch.bot_token().await?).await?; self.add_user_to_cache(&user).await; Ok(user) @@ -259,7 +280,7 @@ impl State { return Ok(Some(user.clone())); } - let user = self.twitch.client.helix.get_user_from_login(login, &self.twitch.bot_token).await?; + let user = self.twitch.client.helix.get_user_from_login(login, &self.twitch.bot_token().await?).await?; self.add_user_to_cache(&user).await; Ok(user) @@ -271,6 +292,17 @@ impl State { self.user_login_cache.lock().await.cache_set(user.login.clone(), user.clone()); } } + + async fn store_live_status(self: &Arc, live: bool) { + let prev = self.live.swap(live, Ordering::SeqCst); + if prev == live { + return; + } + + if let Some(rhai) = &self.config.read().await.events.stream_status { + self::run_rhai(Arc::clone(self), rhai, vec![rhai::Dynamic::from(live)]); + } + } } #[derive(Clone)] @@ -316,7 +348,7 @@ impl DerefMut for FullUserToken { } } -pub fn run_rhai>(state: Arc, script: S, fn_state: ExecutorState) { +pub fn run_rhai>(state: Arc, script: S, fn_args: Vec) { let script = script.into(); tokio::task::spawn_blocking(move || { let res: anyhow::Result<()> = try { @@ -339,7 +371,7 @@ pub fn run_rhai>(state: Arc, script: S, fn_state: Executo true, "run", Some(&mut output), - [rhai::Dynamic::from(fn_state)], + fn_args, )?; let output: ExecutorOutput = output.cast(); @@ -355,10 +387,34 @@ pub fn run_rhai>(state: Arc, script: S, fn_state: Executo } async fn verify_token(full_token: &mut Option, user_config: &UserConfig, http_client: &reqwest::Client, scopes: Vec) -> anyhow::Result<()> { - match full_token { - Some(t) => if t.validate_token(http_client).await.is_err() { - println!("Refreshing token"); - t.refresh_token(http_client).await?; + let fut = match full_token.take() { + Some(mut t) => { + match t.validate_token(http_client).await { + Ok(validated) => { + let old = t.token; + let token = UserToken::from_existing_unchecked( + old.access_token, + old.refresh_token, + validated.client_id, + t.client_secret.clone(), + validated.login.unwrap_or(old.login), + validated.user_id.unwrap_or(old.user_id), + validated.scopes, + Some(validated.expires_in), + ); + + FullUserToken { + client_id: t.client_id, + client_secret: t.client_secret, + token, + } + }, + Err(_) => { + println!("Refreshing token"); + t.refresh_token(http_client).await?; + t + }, + } }, None => { let mut builder = UserToken::builder( @@ -372,13 +428,15 @@ async fn verify_token(full_token: &mut Option, user_config: &User let mut code = String::new(); std::io::stdin().read_line(&mut code)?; let token: UserToken = builder.get_user_token(http_client, csrf.as_str(), code.trim()).await?; - *full_token = Some(FullUserToken { + FullUserToken { token, client_id: user_config.twitch.client_id.clone(), client_secret: user_config.twitch.client_secret.clone(), - }); + } } }; + *full_token = Some(fut); + Ok(()) } diff --git a/src/app/config.rs b/src/app/config.rs index 8c914a9..27d7b83 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -12,6 +12,8 @@ pub struct Config { pub user_token: Option, pub commands: Vec, pub redemptions: Vec, + #[serde(default)] + pub events: Events, } #[derive(Deserialize, Serialize, Clone)] @@ -47,3 +49,8 @@ pub struct Redemption { pub twitch_id: twitch_api2::types::RewardId, pub rhai: String, } + +#[derive(Deserialize, Serialize, Default, Clone)] +pub struct Events { + pub stream_status: Option, +} diff --git a/src/app/rhai_tools.rs b/src/app/rhai_tools.rs index d558661..aa772b4 100644 --- a/src/app/rhai_tools.rs +++ b/src/app/rhai_tools.rs @@ -103,7 +103,7 @@ impl ExecutorState { let req = GetChannelInformationRequest::builder() .broadcaster_id(id.into()) .build(); - self.state.twitch.client.helix.req_get(req, &self.state.twitch.bot_token) + self.state.twitch.client.helix.req_get(req, &self.state.twitch.bot_token().await.ok()?) .await .ok()? .data diff --git a/src/app/task/mod.rs b/src/app/task/mod.rs new file mode 100644 index 0000000..8d62216 --- /dev/null +++ b/src/app/task/mod.rs @@ -0,0 +1 @@ +pub mod stream_status; diff --git a/src/app/task/stream_status.rs b/src/app/task/stream_status.rs new file mode 100644 index 0000000..7a36684 --- /dev/null +++ b/src/app/task/stream_status.rs @@ -0,0 +1,40 @@ +use chrono::Duration; +use twitch_api2::{ + types::UserId, + helix::streams::{ + GetStreamsRequest, + get_streams::Stream, + }, +}; +use crate::app::State; +use std::sync::Arc; + +pub async fn start_task(state: Arc) { + tokio::task::spawn(async move { + loop { + let req = GetStreamsRequest::builder() + .user_id(vec![ + UserId::new(state.user_config.twitch.channel_id.to_string()), + ]) + .build(); + let token = match state.twitch.bot_token().await { + Ok(token) => token, + Err(e) => { + eprintln!("could not get bot token: {:?}", e); + continue; + }, + }; + match state.twitch.client.helix.req_get(req, &token).await { + Ok(resp) => { + let streams: Vec = resp.data; + state.store_live_status(!streams.is_empty()).await; + } + Err(e) => { + eprintln!("could not get stream status: {:?}", e); + } + } + + tokio::time::sleep(Duration::minutes(5).to_std().unwrap()).await; + } + }); +} diff --git a/src/app/twitch.rs b/src/app/twitch.rs index 75c3697..488bd45 100644 --- a/src/app/twitch.rs +++ b/src/app/twitch.rs @@ -7,6 +7,7 @@ use irc::{ client::prelude::Message, proto::{Command, Response}, }; +use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::Message as WsMessage; use crate::app::{ State, @@ -17,8 +18,36 @@ use std::sync::Arc; pub struct Twitch { pub client: TwitchClient<'static, reqwest::Client>, - pub bot_token: UserToken, - pub user_token: UserToken, + config: Arc>, +} + +impl Twitch { + pub fn new(client: TwitchClient<'static, reqwest::Client>, config: Arc>) -> Self { + Self { + client, + config, + } + } + + pub async fn bot_token(&self) -> anyhow::Result { + let mut bot_token = self.config.read().await.bot_token.as_ref().unwrap().clone(); + if bot_token.token.expires_in() <= std::time::Duration::from_secs(60 * 15) { + bot_token.refresh_token(&reqwest::Client::new()).await?; + self.config.write().await.bot_token.replace(bot_token.clone()); + } + + Ok(bot_token.token) + } + + pub async fn user_token(&self) -> anyhow::Result { + let mut user_token = self.config.read().await.user_token.as_ref().unwrap().clone(); + if user_token.token.expires_in() <= std::time::Duration::from_secs(60 * 15) { + user_token.refresh_token(&reqwest::Client::new()).await?; + self.config.write().await.bot_token.replace(user_token.clone()); + } + + Ok(user_token.token) + } } pub async fn handle_irc_event(state: Arc, event: Message) -> anyhow::Result<()> { @@ -26,7 +55,7 @@ pub async fn handle_irc_event(state: Arc, event: Message) -> anyhow::Resu match &event.command { Command::Response(resp, _) if *resp == Response::RPL_WELCOME => { - state.irc.send_join(&channel_name)?; + state.irc.read().await.send_join(&channel_name)?; } // FIXME: do correct checking here Command::PRIVMSG(channel, message) if *channel == channel_name => { @@ -120,7 +149,8 @@ async fn on_privmsg(state: Arc, event: &Message, message: &str) -> anyhow speedgame_name, speedgame_category, }; - crate::app::run_rhai(Arc::clone(&state), t, fn_state); + let args = vec![rhai::Dynamic::from(fn_state)]; + crate::app::run_rhai(Arc::clone(&state), t, args); } } @@ -133,6 +163,9 @@ use twitch_api2::pubsub::{ channel_points::ChannelPointsChannelV1Reply, }; use std::time::Instant; +use twitch_api2::pubsub::video_playback::VideoPlaybackReply; +use crate::app::config::Config; +use twitch_api2::twitch_oauth2::TwitchToken; pub async fn handle_pubsub(state: Arc, event: WsMessage) -> anyhow::Result<()> { let json = match event { @@ -142,11 +175,17 @@ pub async fn handle_pubsub(state: Arc, event: WsMessage) -> anyhow::Resul let response = twitch_api2::pubsub::Response::parse(&json)?; - let reply = match response { - PubSubResponse::Message { data: TopicData::ChannelPointsChannelV1 { reply, .. } } => reply, - _ => return Ok(()), - }; + match response { + PubSubResponse::Message {data} => match data { + TopicData::ChannelPointsChannelV1 { reply, .. } => handle_channel_points_reply(state, reply).await, + TopicData::VideoPlaybackById { reply, .. } => handle_stream_status(state, reply).await, + _ => Ok(()), + }, + _ => Ok(()), + } +} +async fn handle_channel_points_reply(state: Arc, reply: Box) -> anyhow::Result<()> { let redemption = match *reply { ChannelPointsChannelV1Reply::RewardRedeemed { redemption, .. } => redemption, _ => return Ok(()), @@ -174,7 +213,19 @@ pub async fn handle_pubsub(state: Arc, event: WsMessage) -> anyhow::Resul speedgame_name, speedgame_category, }; - crate::app::run_rhai(Arc::clone(&state), &action.rhai, fn_state); + let args = vec![rhai::Dynamic::from(fn_state)]; + crate::app::run_rhai(Arc::clone(&state), &action.rhai, args); + Ok(()) +} + +async fn handle_stream_status(state: Arc, reply: Box) -> anyhow::Result<()> { + let live = match *reply { + VideoPlaybackReply::StreamUp { .. } => true, + VideoPlaybackReply::StreamDown { .. } => false, + _ => return Ok(()), + }; + + state.store_live_status(live).await; Ok(()) } diff --git a/src/app/twitch_irc.rs b/src/app/twitch_irc.rs new file mode 100644 index 0000000..12e8191 --- /dev/null +++ b/src/app/twitch_irc.rs @@ -0,0 +1,22 @@ +use anyhow::Result; +use irc::client::prelude::{Capability, Client as IrcClient, Config as IrcConfig}; +use crate::app::twitch::Twitch; + +pub async fn connect(twitch: &Twitch) -> Result { + let token = twitch.bot_token().await?; + let irc_config = IrcConfig { + server: Some("irc.chat.twitch.tv".into()), + port: Some(6697), + username: Some(token.login.clone()), + nickname: Some(token.login.clone()), + password: Some(format!("oauth:{}", token.access_token.secret())), + ..Default::default() + }; + let irc = IrcClient::from_config(irc_config).await?; + irc.send_cap_req(&[ + Capability::Custom("twitch.tv/tags"), + Capability::Custom("twitch.tv/commands"), + ])?; + irc.identify()?; + Ok(irc) +} diff --git a/src/app/web.rs b/src/app/web.rs index aa2b3d5..9c2b7f5 100644 --- a/src/app/web.rs +++ b/src/app/web.rs @@ -33,6 +33,7 @@ pub async fn start_web(state: Arc) { commands_routes(Arc::clone(&state)) .or(redemptions_routes(Arc::clone(&state))) .or(livesplit_routes(Arc::clone(&state))) + .or(events_routes(Arc::clone(&state))) ); let unauthed = access_token_routes(); diff --git a/src/app/web/route/events.rs b/src/app/web/route/events.rs new file mode 100644 index 0000000..93d094e --- /dev/null +++ b/src/app/web/route/events.rs @@ -0,0 +1,67 @@ +use warp::{Filter, Reply, filters::BoxedFilter, http::Uri}; +use crate::app::{ + State, + web::{ + CustomRejection, + template::events::EventsTemplate, + }, +}; +use std::{ + collections::HashMap, + convert::Infallible, + sync::Arc, +}; + +pub fn events_routes(state: Arc) -> BoxedFilter<(impl Reply, )> { + warp::get() + .and( + events_get(Arc::clone(&state)) + ) + .or(warp::post().and( + events_edit_post(Arc::clone(&state)) + )) + .boxed() +} + +fn events_get(state: Arc) -> BoxedFilter<(impl Reply, )> { + warp::path("events") + .and(warp::path::end()) + .and_then(move || { + let state = Arc::clone(&state); + async move { + Result::::Ok(EventsTemplate { + events: state.config.read().await.events.clone(), + }) + } + }) + .boxed() +} + +fn events_edit_post(state: Arc) -> BoxedFilter<(impl Reply, )> { + warp::path("events") + .and(warp::path::end()) + .and(warp::body::content_length_limit(1024 * 5)) + .and(warp::body::form()) + .and_then(move |mut form: HashMap| { + let state = Arc::clone(&state); + async move { + let event_name = match form.remove("event") { + Some(name) => name, + None => return Err(warp::reject::custom(CustomRejection::InvalidForm)), + }; + + let script = match form.remove("script") { + Some(script) => script, + None => return Err(warp::reject::custom(CustomRejection::InvalidForm)), + }; + + match &*event_name { + "stream_status" => state.config.write().await.events.stream_status = Some(script), + _ => return Err(warp::reject::custom(CustomRejection::InvalidForm)), + } + + Ok(warp::redirect(Uri::from_static("/events"))) + } + }) + .boxed() +} diff --git a/src/app/web/route/livesplit.rs b/src/app/web/route/livesplit.rs index f6f120f..4850ea9 100644 --- a/src/app/web/route/livesplit.rs +++ b/src/app/web/route/livesplit.rs @@ -53,7 +53,7 @@ async fn set_reward_paused(state: Arc, id: String, paused: bool) -> anyho .is_paused(paused) .build(); - state.twitch.client.helix.req_patch(request, body, &state.twitch.user_token).await?; + state.twitch.client.helix.req_patch(request, body, &state.twitch.user_token().await?).await?; Ok(()) } diff --git a/src/app/web/route/mod.rs b/src/app/web/route/mod.rs index 0c95539..92fed6d 100644 --- a/src/app/web/route/mod.rs +++ b/src/app/web/route/mod.rs @@ -1,10 +1,12 @@ pub mod access_token; pub mod commands; +pub mod events; pub mod livesplit; pub mod redemptions; pub use self::access_token::*; pub use self::commands::*; +pub use self::events::*; pub use self::livesplit::*; pub use self::redemptions::*; diff --git a/src/app/web/route/redemptions.rs b/src/app/web/route/redemptions.rs index 158e08a..1b3b1a6 100644 --- a/src/app/web/route/redemptions.rs +++ b/src/app/web/route/redemptions.rs @@ -126,7 +126,11 @@ fn redemptions_list_get(state: Arc) -> BoxedFilter<(impl Reply, )> { let req = GetCustomRewardRequest::builder() .broadcaster_id(state.user_config.twitch.channel_id.to_string()) .build(); - let rewards: Vec = match state.twitch.client.helix.req_get(req, &state.twitch.user_token).await { + let token = match state.twitch.user_token().await { + Ok(token) => token, + Err(_) => return Err(warp::reject::custom(CustomRejection::TwitchError)), + }; + let rewards: Vec = match state.twitch.client.helix.req_get(req, &token).await { Ok(resp) => resp.data, Err(_) => return Err(warp::reject::custom(CustomRejection::TwitchError)), }; diff --git a/src/app/web/template/events.rs b/src/app/web/template/events.rs new file mode 100644 index 0000000..c952e52 --- /dev/null +++ b/src/app/web/template/events.rs @@ -0,0 +1,8 @@ +use askama::Template; +use crate::app::config::Events; + +#[derive(Template)] +#[template(path = "events.html")] +pub struct EventsTemplate { + pub events: Events, +} diff --git a/src/app/web/template/mod.rs b/src/app/web/template/mod.rs index bf349dc..d0214f5 100644 --- a/src/app/web/template/mod.rs +++ b/src/app/web/template/mod.rs @@ -1,3 +1,4 @@ pub mod commands; +pub mod events; pub mod index; pub mod redemptions; diff --git a/templates/events.html b/templates/events.html new file mode 100644 index 0000000..654e649 --- /dev/null +++ b/templates/events.html @@ -0,0 +1,43 @@ +{% extends "_base.html" %} + +{% block title %}Events{% endblock %} + +{% block head %} + +{% endblock %} + +{% block body %} + + +
+ Stream status change +
+ + + +
+
+{% endblock %}