clemsbot/src/app.rs

324 lines
11 KiB
Rust

pub mod config;
pub mod rhai_tools;
pub mod twitch;
pub mod user_config;
pub mod web;
use rhai::Engine;
use irc::client::prelude::{Client as IrcClient, Config as IrcConfig, Capability};
use anyhow::Result;
use twitch_api2::TwitchClient;
use crate::app::{
config::Config,
rhai_tools::{ExecutorState, ExecutorOutput},
user_config::UserConfig,
twitch::Twitch,
};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};
use futures::{StreamExt, SinkExt};
use twitch_api2::twitch_oauth2::{ClientSecret, UserToken, TwitchToken, ClientId, RefreshToken, AccessToken};
use twitch_api2::helix::Scope;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use twitch_api2::types::UserId;
use tokio::sync::mpsc::UnboundedSender;
use tokio::runtime::Handle;
use std::collections::HashMap;
use twitch_api2::pubsub::Topic;
use tokio_tungstenite::tungstenite::Message as WsMessage;
pub struct State {
pub user_config: UserConfig,
pub config: RwLock<Config>,
pub channel_name: String,
pub twitch: Arc<Twitch>,
pub rewards_paused: RwLock<HashMap<String, bool>>,
pub irc: IrcClient,
pub irc_queue: UnboundedSender<String>,
pub rhai: Engine,
pub script_cache: parking_lot::RwLock<HashMap<String, rhai::AST>>,
pub runtime: Handle,
}
impl State {
#[allow(unreachable_code)]
pub async fn new(runtime: Handle, user_config: UserConfig, mut config: Config) -> Result<Arc<Self>> {
let http_client = reqwest::Client::new();
println!("Verifying bot token");
verify_token(&mut config.bot_token, &user_config, &http_client, vec![
// IRC
Scope::ChatRead,
Scope::ChatEdit,
]).await?;
println!("Bot token ready");
println!("Verifying user token");
verify_token(&mut config.user_token, &user_config, &http_client, vec![
// Channel points redemptions
Scope::ChannelReadRedemptions,
Scope::ChannelManageRedemptions,
// Mod stuff
Scope::ChannelModerate,
Scope::ModerationRead,
Scope::ModeratorManageAutoMod,
]).await?;
println!("User token ready");
let twitch_client = TwitchClient::with_client(http_client);
let twitch = Arc::new(Twitch {
client: twitch_client,
bot_token: config.bot_token.clone().unwrap().token,
user_token: config.user_token.clone().unwrap().token,
});
let user_id = UserId::new(user_config.twitch.channel_id.to_string());
let channel_name = twitch.client.helix.get_user_from_id(user_id, &twitch.bot_token)
.await?
.ok_or_else(|| anyhow::anyhow!("no channel for id {}", user_config.twitch.channel_id))?
.login
.to_string();
let irc_config = IrcConfig {
server: Some("irc.chat.twitch.tv".into()),
port: Some(6697),
username: Some(twitch.bot_token.login.clone()),
nickname: Some(twitch.bot_token.login.clone()),
password: Some(format!("oauth:{}", twitch.bot_token.access_token.secret())),
..Default::default()
};
let mut irc = IrcClient::from_config(irc_config).await?;
irc.send_cap_req(&[
Capability::Custom("twitch.tv/tags"),
Capability::Custom("twitch.tv/commands"),
])?;
irc.identify()?;
let mut irc_stream = irc.stream()?;
let mut rhai = Engine::new();
rhai.set_max_expr_depths(0, 0);
rhai.register_type::<ExecutorState>()
.register_get("initiator", ExecutorState::initiator)
.register_get("initiator_id", ExecutorState::initiator_id)
.register_get("args", ExecutorState::args)
.register_fn("get_username", ExecutorState::get_username::<&str>)
.register_fn("get_user_id", ExecutorState::get_user_id::<&str>)
.register_fn("get_channel_info", ExecutorState::get_channel_info::<&str>);
rhai.register_type::<ExecutorOutput>()
.register_fn("send", ExecutorOutput::send::<&str>);
let (queue_tx, mut queue_rx) = tokio::sync::mpsc::unbounded_channel();
let state = Arc::new(Self {
user_config,
config: RwLock::new(config),
channel_name,
twitch,
rewards_paused: Default::default(),
irc,
irc_queue: queue_tx,
rhai,
script_cache: Default::default(),
runtime,
});
// start web task
tokio::task::spawn(crate::app::web::start_web(Arc::clone(&state)));
// start pubsub
let redemption_topic = twitch_api2::pubsub::channel_points::ChannelPointsChannelV1 {
channel_id: state.user_config.twitch.channel_id as u32,
}.into_topic();
let auth_token: Option<String> = state.config
.read()
.await
.user_token
.as_ref()
.map(|token| token.access_token.secret().to_string());
let listen_command = twitch_api2::pubsub::listen_command(
&[redemption_topic],
auth_token.as_deref(),
"1",
)?;
let task_state = Arc::clone(&state);
tokio::task::spawn(async move {
let res: Result<()> = try {
let (ws, _) = tokio_tungstenite::connect_async("wss://pubsub-edge.twitch.tv").await?;
let (mut write, mut read) = ws.split();
write.send(WsMessage::Text(listen_command)).await?;
let mut ping = tokio::time::interval(chrono::Duration::minutes(2).to_std().unwrap());
loop {
tokio::select! {
_ = ping.tick() => {
write.send(WsMessage::Ping(vec![1, 2, 3, 4])).await?;
write.send(WsMessage::Text(r#"{"type":"PING"}"#.into())).await?;
},
message = read.next() => {
if let Some(Ok(message)) = message {
if let Err(e) = crate::app::twitch::handle_pubsub(Arc::clone(&task_state), message).await {
eprintln!("error in pubsub: {:?}", e);
}
}
},
}
}
};
if let Err(e) = res {
eprintln!("error connecting to websocket: {:?}", e);
}
});
// start irc task
let task_state = Arc::clone(&state);
tokio::task::spawn(async move {
// FIXME: handle reconnects
while let Some(event) = irc_stream.next().await.transpose()? {
let task_state = Arc::clone(&task_state);
if let Err(e) = crate::app::twitch::handle_irc_event(task_state, event).await {
eprintln!("irc error: {:?}", e);
}
}
Result::<(), anyhow::Error>::Ok(())
});
// start irc message queue
let task_state = Arc::clone(&state);
tokio::task::spawn(async move {
let channel = format!("#{}", task_state.channel_name);
while let Some(message) = queue_rx.recv().await {
if let Err(e) = task_state.irc.send_privmsg(&channel, message) {
eprintln!("error sending message: {:?}", e);
}
}
});
Ok(state)
}
}
#[derive(Clone)]
pub struct FullUserToken {
pub token: UserToken,
pub client_id: ClientId,
pub client_secret: ClientSecret,
}
impl<'de> Deserialize<'de> for FullUserToken {
fn deserialize<D>(de: D) -> std::result::Result<FullUserToken, D::Error>
where D: serde::de::Deserializer<'de>, {
let (refresh, access, login, user_id, client_id, client_secret, scopes): (RefreshToken, AccessToken, _, _, ClientId, ClientSecret, Vec<Scope>) = Deserialize::deserialize(de)?;
let token = UserToken::from_existing_unchecked(access, refresh, client_id.clone(), client_secret.clone(), login, user_id, Some(scopes), None);
Ok(FullUserToken {
token,
client_id,
client_secret,
})
}
}
impl Serialize for FullUserToken {
fn serialize<S>(&self, ser: S) -> std::result::Result<S::Ok, S::Error>
where S: serde::ser::Serializer,
{
let to_ser = (&self.token.refresh_token, &self.token.access_token, &self.token.login, &self.token.user_id, &self.client_id, &self.client_secret, self.token.scopes());
to_ser.serialize(ser)
}
}
impl Deref for FullUserToken {
type Target = UserToken;
fn deref(&self) -> &Self::Target {
&self.token
}
}
impl DerefMut for FullUserToken {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.token
}
}
pub fn run_rhai<S: Into<String>>(state: Arc<State>, script: S, fn_state: ExecutorState) {
let script = script.into();
tokio::task::spawn_blocking(move || {
let res: anyhow::Result<()> = try {
let mut scope = rhai::Scope::new();
let ast = state.script_cache.read().get(&script).map(ToOwned::to_owned);
let ast = match ast {
Some(ast) => ast,
None => {
let ast = state.rhai.compile(&script)?;
state.script_cache.write().insert(script.clone(), ast.clone());
ast
},
};
let mut output = rhai::Dynamic::from(ExecutorOutput::default());
state.rhai.call_fn_dynamic(
&mut scope,
&ast,
true,
"run",
Some(&mut output),
[rhai::Dynamic::from(fn_state)],
)?;
let output: ExecutorOutput = output.cast();
for message in output.to_send {
state.irc_queue.send(message).ok();
}
};
if let Err(e) = res {
eprintln!("error in rhai script: {:?}", e);
}
});
}
async fn verify_token(full_token: &mut Option<FullUserToken>, user_config: &UserConfig, http_client: &reqwest::Client, scopes: Vec<Scope>) -> anyhow::Result<()> {
match full_token {
Some(t) => if t.validate_token(http_client).await.is_err() {
println!("Refreshing token");
t.refresh_token(http_client).await?;
},
None => {
let mut builder = UserToken::builder(
user_config.twitch.client_id.clone(),
user_config.twitch.client_secret.clone(),
url::Url::parse("http://localhost/").unwrap(),
).set_scopes(scopes);
let (url, csrf) = builder.generate_url();
println!("go to {}", url);
println!("once done, paste code:");
let mut code = String::new();
std::io::stdin().read_line(&mut code)?;
let token: UserToken = builder.get_user_token(http_client, csrf.as_str(), code.trim()).await?;
*full_token = Some(FullUserToken {
token,
client_id: user_config.twitch.client_id.clone(),
client_secret: user_config.twitch.client_secret.clone(),
});
}
};
Ok(())
}