clemsbot/src/app.rs

474 lines
16 KiB
Rust

pub mod config;
pub mod duration_tools;
pub mod rhai_tools;
pub mod task;
pub mod twitch;
pub mod twitch_irc;
pub mod user_config;
pub mod web;
use anyhow::Result;
use cached::{TimedCache, Cached};
use chrono::Duration;
use futures::{StreamExt, SinkExt};
use irc::client::prelude::Client as IrcClient;
use rhai::Engine;
use serde::{Deserialize, Serialize};
use tokio::{
runtime::Handle,
sync::{
Mutex, RwLock,
mpsc::UnboundedSender,
},
};
use tokio_tungstenite::tungstenite::Message as WsMessage;
use twitch_api2::{
TwitchClient,
helix::{
Scope,
users::User,
},
pubsub::Topic,
types::{UserId, UserName},
twitch_oauth2::{ClientSecret, UserToken, TwitchToken, ClientId, RefreshToken, AccessToken},
};
use crate::app::{
config::Config,
rhai_tools::{ExecutorState, ExecutorOutput},
user_config::UserConfig,
twitch::Twitch,
};
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
path::Path,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Instant,
};
pub struct State {
pub user_config: UserConfig,
pub config: Arc<RwLock<Config>>,
pub channel_name: String,
pub twitch: Arc<Twitch>,
pub live: AtomicBool,
pub rewards_paused: RwLock<HashMap<String, bool>>,
pub user_login_cache: Mutex<TimedCache<UserName, User>>,
pub user_id_cache: Mutex<TimedCache<UserId, User>>,
pub irc: RwLock<IrcClient>,
pub irc_queue: UnboundedSender<String>,
pub user_cooldowns: RwLock<HashMap<(UserId, String), Instant>>,
pub global_cooldowns: RwLock<HashMap<String, Instant>>,
pub rhai: Engine,
pub script_cache: parking_lot::RwLock<HashMap<String, rhai::AST>>,
pub runtime: Handle,
pub speedgame: RwLock<(Option<String>, Option<String>)>,
}
impl State {
#[allow(unreachable_code)]
pub async fn new(runtime: Handle, user_config: UserConfig, mut config: Config) -> Result<Arc<Self>> {
let http_client = reqwest::Client::new();
println!("verifying bot token");
verify_token(&mut config.bot_token, &user_config, &http_client, vec![
// IRC
Scope::ChatRead,
Scope::ChatEdit,
// Mod stuff
Scope::ChannelModerate,
Scope::ModerationRead,
Scope::ModeratorManageAutoMod,
]).await?;
println!("bot token ready");
println!("verifying user token");
verify_token(&mut config.user_token, &user_config, &http_client, vec![
// Channel points redemptions
Scope::ChannelReadRedemptions,
Scope::ChannelManageRedemptions,
// Mod stuff
Scope::ChannelModerate,
Scope::ModerationRead,
Scope::ModeratorManageAutoMod,
]).await?;
println!("user token ready");
let config = Arc::new(RwLock::new(config));
let twitch_client = TwitchClient::with_client(http_client);
let twitch_config = Arc::clone(&config);
let twitch = Arc::new(Twitch::new(twitch_client, twitch_config));
println!("getting channel name");
let user_id = UserId::new(user_config.twitch.channel_id.to_string());
let channel_name = twitch.client.helix.get_user_from_id(user_id, &twitch.bot_token().await?)
.await?
.ok_or_else(|| anyhow::anyhow!("no channel for id {}", user_config.twitch.channel_id))?
.login
.to_string();
println!("configured to run for channel: {}", channel_name);
println!("starting periodic config save task");
let task_config = Arc::clone(&config);
tokio::task::spawn(async move {
loop {
println!("saving config");
if let Err(e) = crate::save_config(Path::new("data.json"), &*task_config.read().await).await {
eprintln!("could not save config: {:?}", e);
}
tokio::time::sleep(Duration::minutes(5).to_std().unwrap()).await;
}
});
println!("connecting to irc");
let mut irc = self::twitch_irc::connect(&twitch).await?;
let mut irc_stream = irc.stream()?;
let mut rhai = Engine::new();
rhai.set_max_expr_depths(0, 0);
rhai.register_type::<ExecutorState>()
.register_get("args", ExecutorState::args)
.register_get("initiator", ExecutorState::initiator)
.register_get("initiator_id", ExecutorState::initiator_id)
.register_get("broadcaster", ExecutorState::broadcaster)
.register_get("moderator", ExecutorState::moderator)
.register_get("vip", ExecutorState::vip)
.register_get("subscriber", ExecutorState::subscriber)
.register_get("speedgame_name", ExecutorState::speedgame_name)
.register_get("speedgame_category", ExecutorState::speedgame_category)
.register_fn("get_username", ExecutorState::get_username::<&str>)
.register_fn("get_user_id", ExecutorState::get_user_id::<&str>)
.register_fn("get_channel_info", ExecutorState::get_channel_info::<&str>);
rhai.register_type::<ExecutorOutput>()
.register_fn("send", ExecutorOutput::send::<&str>);
let (queue_tx, mut queue_rx) = tokio::sync::mpsc::unbounded_channel();
let state = Arc::new(Self {
user_config,
config,
channel_name,
twitch,
live: AtomicBool::new(false),
rewards_paused: Default::default(),
user_login_cache: Mutex::new(TimedCache::with_lifespan(3600)),
user_id_cache: Mutex::new(TimedCache::with_lifespan(3600)),
irc: RwLock::new(irc),
irc_queue: queue_tx,
user_cooldowns: Default::default(),
global_cooldowns: Default::default(),
rhai,
script_cache: Default::default(),
runtime,
speedgame: RwLock::new((None, None)),
});
// start web task
tokio::task::spawn(crate::app::web::start_web(Arc::clone(&state)));
// start pubsub
let redemption_topic = twitch_api2::pubsub::channel_points::ChannelPointsChannelV1 {
channel_id: state.user_config.twitch.channel_id as u32,
}.into_topic();
let video_playback_topic = twitch_api2::pubsub::video_playback::VideoPlaybackById {
channel_id: state.user_config.twitch.channel_id as u32,
}.into_topic();
let auth_token: Option<String> = state.config
.read()
.await
.user_token
.as_ref()
.map(|token| token.access_token.secret().to_string());
let listen_command = twitch_api2::pubsub::listen_command(
&[redemption_topic, video_playback_topic],
auth_token.as_deref(),
"1",
)?;
let task_state = Arc::clone(&state);
tokio::task::spawn(async move {
println!("starting pubsub listener");
let res: Result<()> = try {
let (ws, _) = tokio_tungstenite::connect_async("wss://pubsub-edge.twitch.tv").await?;
let (mut write, mut read) = ws.split();
write.send(WsMessage::Text(listen_command)).await?;
let mut ping = tokio::time::interval(chrono::Duration::minutes(2).to_std().unwrap());
loop {
tokio::select! {
_ = ping.tick() => {
write.send(WsMessage::Ping(vec![1, 2, 3, 4])).await?;
write.send(WsMessage::Text(r#"{"type":"PING"}"#.into())).await?;
},
message = read.next() => {
if let Some(Ok(message)) = message {
if let Err(e) = crate::app::twitch::handle_pubsub(Arc::clone(&task_state), message).await {
eprintln!("error in pubsub: {:?}", e);
}
}
},
}
}
};
if let Err(e) = res {
eprintln!("error connecting to websocket: {:?}", e);
}
});
// start irc task
let task_state = Arc::clone(&state);
tokio::task::spawn(async move {
println!("listening for irc events");
loop {
while let Some(event) = irc_stream.next().await.transpose().unwrap_or_default() {
let task_state = Arc::clone(&task_state);
if let Err(e) = crate::app::twitch::handle_irc_event(task_state, event).await {
eprintln!("irc error: {:?}", e);
}
}
// reconnect
eprintln!("reconnecting to irc... ");
match self::twitch_irc::connect(&*task_state.twitch).await {
Ok(mut client) => {
eprintln!("connected to irc");
if let Ok(stream) = client.stream() {
irc_stream = stream;
} else {
eprintln!("connected but no stream?");
}
*task_state.irc.write().await = client;
}
Err(e) => {
eprintln!("error reconnecting to irc: {:?}", e);
}
}
}
});
// start irc message queue
let task_state = Arc::clone(&state);
tokio::task::spawn(async move {
let channel = format!("#{}", task_state.channel_name);
while let Some(message) = queue_rx.recv().await {
if let Err(e) = task_state.irc.read().await.send_privmsg(&channel, message) {
eprintln!("error sending message: {:?}", e);
}
}
});
// start stream status task
let task_state = Arc::clone(&state);
self::task::stream_status::start_task(task_state).await;
// start token refresh task
let task_state = Arc::clone(&state);
self::task::tokens::start_task(task_state).await;
println!("initialised");
Ok(state)
}
pub async fn get_user_from_id<S: Into<UserId>>(&self, id: S) -> anyhow::Result<Option<User>> {
let id = id.into();
if let Some(user) = self.user_id_cache.lock().await.cache_get(&id) {
return Ok(Some(user.clone()));
}
let user = self.twitch.client.helix.get_user_from_id(id, &self.twitch.bot_token().await?).await?;
self.add_user_to_cache(&user).await;
Ok(user)
}
pub async fn get_user_from_login<S: Into<UserName>>(&self, login: S) -> anyhow::Result<Option<User>> {
let login = login.into();
if let Some(user) = self.user_login_cache.lock().await.cache_get(&login) {
return Ok(Some(user.clone()));
}
let user = self.twitch.client.helix.get_user_from_login(login, &self.twitch.bot_token().await?).await?;
self.add_user_to_cache(&user).await;
Ok(user)
}
async fn add_user_to_cache(&self, user: &Option<User>) {
if let Some(user) = user {
self.user_id_cache.lock().await.cache_set(user.id.clone(), user.clone());
self.user_login_cache.lock().await.cache_set(user.login.clone(), user.clone());
}
}
async fn store_live_status(self: &Arc<State>, live: bool) {
let prev = self.live.swap(live, Ordering::SeqCst);
if prev == live {
return;
}
if let Some(rhai) = &self.config.read().await.events.stream_status {
self::run_rhai(Arc::clone(self), rhai, vec![rhai::Dynamic::from(live)]);
}
}
}
#[derive(Clone)]
pub struct FullUserToken {
pub token: UserToken,
pub client_id: ClientId,
pub client_secret: ClientSecret,
}
impl<'de> Deserialize<'de> for FullUserToken {
fn deserialize<D>(de: D) -> std::result::Result<FullUserToken, D::Error>
where D: serde::de::Deserializer<'de>, {
let (refresh, access, login, user_id, client_id, client_secret, scopes): (RefreshToken, AccessToken, _, _, ClientId, ClientSecret, Vec<Scope>) = Deserialize::deserialize(de)?;
let token = UserToken::from_existing_unchecked(access, refresh, client_id.clone(), client_secret.clone(), login, user_id, Some(scopes), None);
Ok(FullUserToken {
token,
client_id,
client_secret,
})
}
}
impl Serialize for FullUserToken {
fn serialize<S>(&self, ser: S) -> std::result::Result<S::Ok, S::Error>
where S: serde::ser::Serializer,
{
let to_ser = (&self.token.refresh_token, &self.token.access_token, &self.token.login, &self.token.user_id, &self.client_id, &self.client_secret, self.token.scopes());
to_ser.serialize(ser)
}
}
impl Deref for FullUserToken {
type Target = UserToken;
fn deref(&self) -> &Self::Target {
&self.token
}
}
impl DerefMut for FullUserToken {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.token
}
}
pub fn run_rhai<S: Into<String>>(state: Arc<State>, script: S, fn_args: Vec<rhai::Dynamic>) {
let script = script.into();
tokio::task::spawn_blocking(move || {
let res: anyhow::Result<()> = try {
let mut scope = rhai::Scope::new();
let ast = state.script_cache.read().get(&script).map(ToOwned::to_owned);
let ast = match ast {
Some(ast) => ast,
None => {
let ast = state.rhai.compile(&script)?;
state.script_cache.write().insert(script.clone(), ast.clone());
ast
}
};
let mut output = rhai::Dynamic::from(ExecutorOutput::default());
state.rhai.call_fn_raw(
&mut scope,
&ast,
true,
false,
"run",
Some(&mut output),
fn_args,
)?;
let output: ExecutorOutput = output.cast();
for message in output.to_send {
state.irc_queue.send(message).ok();
}
};
if let Err(e) = res {
eprintln!("error in rhai script: {:?}", e);
}
});
}
async fn verify_token(full_token: &mut Option<FullUserToken>, user_config: &UserConfig, http_client: &reqwest::Client, scopes: Vec<Scope>) -> anyhow::Result<()> {
let fut = match full_token.take() {
Some(mut t) => {
match t.validate_token(http_client).await {
Ok(validated) => {
let old = t.token;
let token = UserToken::from_existing_unchecked(
old.access_token,
old.refresh_token,
validated.client_id,
t.client_secret.clone(),
validated.login.unwrap_or(old.login),
validated.user_id.unwrap_or(old.user_id),
validated.scopes,
Some(validated.expires_in),
);
FullUserToken {
client_id: t.client_id,
client_secret: t.client_secret,
token,
}
},
Err(_) => {
println!("Refreshing token");
t.refresh_token(http_client).await?;
t
},
}
},
None => {
let mut builder = UserToken::builder(
user_config.twitch.client_id.clone(),
user_config.twitch.client_secret.clone(),
url::Url::parse("http://localhost/").unwrap(),
).set_scopes(scopes);
let (url, csrf) = builder.generate_url();
println!("go to {}", url);
println!("once done, paste code:");
let mut code = String::new();
std::io::stdin().read_line(&mut code)?;
let token: UserToken = builder.get_user_token(http_client, csrf.as_str(), code.trim()).await?;
FullUserToken {
token,
client_id: user_config.twitch.client_id.clone(),
client_secret: user_config.twitch.client_secret.clone(),
}
}
};
*full_token = Some(fut);
Ok(())
}