474 lines
16 KiB
Rust
474 lines
16 KiB
Rust
pub mod config;
|
|
pub mod duration_tools;
|
|
pub mod rhai_tools;
|
|
pub mod task;
|
|
pub mod twitch;
|
|
pub mod twitch_irc;
|
|
pub mod user_config;
|
|
pub mod web;
|
|
|
|
use anyhow::Result;
|
|
use cached::{TimedCache, Cached};
|
|
use chrono::Duration;
|
|
use futures::{StreamExt, SinkExt};
|
|
use irc::client::prelude::Client as IrcClient;
|
|
use rhai::Engine;
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::{
|
|
runtime::Handle,
|
|
sync::{
|
|
Mutex, RwLock,
|
|
mpsc::UnboundedSender,
|
|
},
|
|
};
|
|
use tokio_tungstenite::tungstenite::Message as WsMessage;
|
|
use twitch_api2::{
|
|
TwitchClient,
|
|
helix::{
|
|
Scope,
|
|
users::User,
|
|
},
|
|
pubsub::Topic,
|
|
types::{UserId, UserName},
|
|
twitch_oauth2::{ClientSecret, UserToken, TwitchToken, ClientId, RefreshToken, AccessToken},
|
|
};
|
|
use crate::app::{
|
|
config::Config,
|
|
rhai_tools::{ExecutorState, ExecutorOutput},
|
|
user_config::UserConfig,
|
|
twitch::Twitch,
|
|
};
|
|
use std::{
|
|
collections::HashMap,
|
|
ops::{Deref, DerefMut},
|
|
path::Path,
|
|
sync::{
|
|
Arc,
|
|
atomic::{AtomicBool, Ordering},
|
|
},
|
|
time::Instant,
|
|
};
|
|
|
|
pub struct State {
|
|
pub user_config: UserConfig,
|
|
pub config: Arc<RwLock<Config>>,
|
|
|
|
pub channel_name: String,
|
|
|
|
pub twitch: Arc<Twitch>,
|
|
pub live: AtomicBool,
|
|
pub rewards_paused: RwLock<HashMap<String, bool>>,
|
|
pub user_login_cache: Mutex<TimedCache<UserName, User>>,
|
|
pub user_id_cache: Mutex<TimedCache<UserId, User>>,
|
|
|
|
pub irc: RwLock<IrcClient>,
|
|
pub irc_queue: UnboundedSender<String>,
|
|
pub user_cooldowns: RwLock<HashMap<(UserId, String), Instant>>,
|
|
pub global_cooldowns: RwLock<HashMap<String, Instant>>,
|
|
|
|
pub rhai: Engine,
|
|
pub script_cache: parking_lot::RwLock<HashMap<String, rhai::AST>>,
|
|
pub runtime: Handle,
|
|
|
|
pub speedgame: RwLock<(Option<String>, Option<String>)>,
|
|
}
|
|
|
|
impl State {
|
|
#[allow(unreachable_code)]
|
|
pub async fn new(runtime: Handle, user_config: UserConfig, mut config: Config) -> Result<Arc<Self>> {
|
|
let http_client = reqwest::Client::new();
|
|
|
|
println!("verifying bot token");
|
|
verify_token(&mut config.bot_token, &user_config, &http_client, vec![
|
|
// IRC
|
|
Scope::ChatRead,
|
|
Scope::ChatEdit,
|
|
|
|
// Mod stuff
|
|
Scope::ChannelModerate,
|
|
Scope::ModerationRead,
|
|
Scope::ModeratorManageAutoMod,
|
|
]).await?;
|
|
println!("bot token ready");
|
|
|
|
println!("verifying user token");
|
|
verify_token(&mut config.user_token, &user_config, &http_client, vec![
|
|
// Channel points redemptions
|
|
Scope::ChannelReadRedemptions,
|
|
Scope::ChannelManageRedemptions,
|
|
|
|
// Mod stuff
|
|
Scope::ChannelModerate,
|
|
Scope::ModerationRead,
|
|
Scope::ModeratorManageAutoMod,
|
|
]).await?;
|
|
println!("user token ready");
|
|
|
|
let config = Arc::new(RwLock::new(config));
|
|
|
|
let twitch_client = TwitchClient::with_client(http_client);
|
|
let twitch_config = Arc::clone(&config);
|
|
let twitch = Arc::new(Twitch::new(twitch_client, twitch_config));
|
|
|
|
println!("getting channel name");
|
|
let user_id = UserId::new(user_config.twitch.channel_id.to_string());
|
|
let channel_name = twitch.client.helix.get_user_from_id(user_id, &twitch.bot_token().await?)
|
|
.await?
|
|
.ok_or_else(|| anyhow::anyhow!("no channel for id {}", user_config.twitch.channel_id))?
|
|
.login
|
|
.to_string();
|
|
println!("configured to run for channel: {}", channel_name);
|
|
|
|
println!("starting periodic config save task");
|
|
let task_config = Arc::clone(&config);
|
|
tokio::task::spawn(async move {
|
|
loop {
|
|
println!("saving config");
|
|
if let Err(e) = crate::save_config(Path::new("data.json"), &*task_config.read().await).await {
|
|
eprintln!("could not save config: {:?}", e);
|
|
}
|
|
|
|
tokio::time::sleep(Duration::minutes(5).to_std().unwrap()).await;
|
|
}
|
|
});
|
|
|
|
println!("connecting to irc");
|
|
let mut irc = self::twitch_irc::connect(&twitch).await?;
|
|
let mut irc_stream = irc.stream()?;
|
|
|
|
let mut rhai = Engine::new();
|
|
rhai.set_max_expr_depths(0, 0);
|
|
rhai.register_type::<ExecutorState>()
|
|
.register_get("args", ExecutorState::args)
|
|
.register_get("initiator", ExecutorState::initiator)
|
|
.register_get("initiator_id", ExecutorState::initiator_id)
|
|
.register_get("broadcaster", ExecutorState::broadcaster)
|
|
.register_get("moderator", ExecutorState::moderator)
|
|
.register_get("vip", ExecutorState::vip)
|
|
.register_get("subscriber", ExecutorState::subscriber)
|
|
.register_get("speedgame_name", ExecutorState::speedgame_name)
|
|
.register_get("speedgame_category", ExecutorState::speedgame_category)
|
|
.register_fn("get_username", ExecutorState::get_username::<&str>)
|
|
.register_fn("get_user_id", ExecutorState::get_user_id::<&str>)
|
|
.register_fn("get_channel_info", ExecutorState::get_channel_info::<&str>);
|
|
rhai.register_type::<ExecutorOutput>()
|
|
.register_fn("send", ExecutorOutput::send::<&str>);
|
|
|
|
let (queue_tx, mut queue_rx) = tokio::sync::mpsc::unbounded_channel();
|
|
|
|
let state = Arc::new(Self {
|
|
user_config,
|
|
config,
|
|
|
|
channel_name,
|
|
|
|
twitch,
|
|
live: AtomicBool::new(false),
|
|
rewards_paused: Default::default(),
|
|
user_login_cache: Mutex::new(TimedCache::with_lifespan(3600)),
|
|
user_id_cache: Mutex::new(TimedCache::with_lifespan(3600)),
|
|
|
|
irc: RwLock::new(irc),
|
|
irc_queue: queue_tx,
|
|
user_cooldowns: Default::default(),
|
|
global_cooldowns: Default::default(),
|
|
|
|
rhai,
|
|
script_cache: Default::default(),
|
|
runtime,
|
|
|
|
speedgame: RwLock::new((None, None)),
|
|
});
|
|
|
|
// start web task
|
|
tokio::task::spawn(crate::app::web::start_web(Arc::clone(&state)));
|
|
|
|
// start pubsub
|
|
let redemption_topic = twitch_api2::pubsub::channel_points::ChannelPointsChannelV1 {
|
|
channel_id: state.user_config.twitch.channel_id as u32,
|
|
}.into_topic();
|
|
let video_playback_topic = twitch_api2::pubsub::video_playback::VideoPlaybackById {
|
|
channel_id: state.user_config.twitch.channel_id as u32,
|
|
}.into_topic();
|
|
|
|
let auth_token: Option<String> = state.config
|
|
.read()
|
|
.await
|
|
.user_token
|
|
.as_ref()
|
|
.map(|token| token.access_token.secret().to_string());
|
|
let listen_command = twitch_api2::pubsub::listen_command(
|
|
&[redemption_topic, video_playback_topic],
|
|
auth_token.as_deref(),
|
|
"1",
|
|
)?;
|
|
let task_state = Arc::clone(&state);
|
|
tokio::task::spawn(async move {
|
|
println!("starting pubsub listener");
|
|
let res: Result<()> = try {
|
|
let (ws, _) = tokio_tungstenite::connect_async("wss://pubsub-edge.twitch.tv").await?;
|
|
let (mut write, mut read) = ws.split();
|
|
write.send(WsMessage::Text(listen_command)).await?;
|
|
let mut ping = tokio::time::interval(chrono::Duration::minutes(2).to_std().unwrap());
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = ping.tick() => {
|
|
write.send(WsMessage::Ping(vec![1, 2, 3, 4])).await?;
|
|
write.send(WsMessage::Text(r#"{"type":"PING"}"#.into())).await?;
|
|
},
|
|
message = read.next() => {
|
|
if let Some(Ok(message)) = message {
|
|
if let Err(e) = crate::app::twitch::handle_pubsub(Arc::clone(&task_state), message).await {
|
|
eprintln!("error in pubsub: {:?}", e);
|
|
}
|
|
}
|
|
},
|
|
}
|
|
}
|
|
};
|
|
|
|
if let Err(e) = res {
|
|
eprintln!("error connecting to websocket: {:?}", e);
|
|
}
|
|
});
|
|
|
|
// start irc task
|
|
let task_state = Arc::clone(&state);
|
|
tokio::task::spawn(async move {
|
|
println!("listening for irc events");
|
|
loop {
|
|
while let Some(event) = irc_stream.next().await.transpose().unwrap_or_default() {
|
|
let task_state = Arc::clone(&task_state);
|
|
if let Err(e) = crate::app::twitch::handle_irc_event(task_state, event).await {
|
|
eprintln!("irc error: {:?}", e);
|
|
}
|
|
}
|
|
|
|
// reconnect
|
|
eprintln!("reconnecting to irc... ");
|
|
match self::twitch_irc::connect(&*task_state.twitch).await {
|
|
Ok(mut client) => {
|
|
eprintln!("connected to irc");
|
|
|
|
if let Ok(stream) = client.stream() {
|
|
irc_stream = stream;
|
|
} else {
|
|
eprintln!("connected but no stream?");
|
|
}
|
|
|
|
*task_state.irc.write().await = client;
|
|
}
|
|
Err(e) => {
|
|
eprintln!("error reconnecting to irc: {:?}", e);
|
|
}
|
|
}
|
|
|
|
}
|
|
});
|
|
|
|
// start irc message queue
|
|
let task_state = Arc::clone(&state);
|
|
tokio::task::spawn(async move {
|
|
let channel = format!("#{}", task_state.channel_name);
|
|
|
|
while let Some(message) = queue_rx.recv().await {
|
|
if let Err(e) = task_state.irc.read().await.send_privmsg(&channel, message) {
|
|
eprintln!("error sending message: {:?}", e);
|
|
}
|
|
}
|
|
});
|
|
|
|
// start stream status task
|
|
let task_state = Arc::clone(&state);
|
|
self::task::stream_status::start_task(task_state).await;
|
|
|
|
// start token refresh task
|
|
let task_state = Arc::clone(&state);
|
|
self::task::tokens::start_task(task_state).await;
|
|
|
|
println!("initialised");
|
|
|
|
Ok(state)
|
|
}
|
|
|
|
pub async fn get_user_from_id<S: Into<UserId>>(&self, id: S) -> anyhow::Result<Option<User>> {
|
|
let id = id.into();
|
|
if let Some(user) = self.user_id_cache.lock().await.cache_get(&id) {
|
|
return Ok(Some(user.clone()));
|
|
}
|
|
|
|
let user = self.twitch.client.helix.get_user_from_id(id, &self.twitch.bot_token().await?).await?;
|
|
self.add_user_to_cache(&user).await;
|
|
|
|
Ok(user)
|
|
}
|
|
|
|
pub async fn get_user_from_login<S: Into<UserName>>(&self, login: S) -> anyhow::Result<Option<User>> {
|
|
let login = login.into();
|
|
if let Some(user) = self.user_login_cache.lock().await.cache_get(&login) {
|
|
return Ok(Some(user.clone()));
|
|
}
|
|
|
|
let user = self.twitch.client.helix.get_user_from_login(login, &self.twitch.bot_token().await?).await?;
|
|
self.add_user_to_cache(&user).await;
|
|
|
|
Ok(user)
|
|
}
|
|
|
|
async fn add_user_to_cache(&self, user: &Option<User>) {
|
|
if let Some(user) = user {
|
|
self.user_id_cache.lock().await.cache_set(user.id.clone(), user.clone());
|
|
self.user_login_cache.lock().await.cache_set(user.login.clone(), user.clone());
|
|
}
|
|
}
|
|
|
|
async fn store_live_status(self: &Arc<State>, live: bool) {
|
|
let prev = self.live.swap(live, Ordering::SeqCst);
|
|
if prev == live {
|
|
return;
|
|
}
|
|
|
|
if let Some(rhai) = &self.config.read().await.events.stream_status {
|
|
self::run_rhai(Arc::clone(self), rhai, vec![rhai::Dynamic::from(live)]);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct FullUserToken {
|
|
pub token: UserToken,
|
|
pub client_id: ClientId,
|
|
pub client_secret: ClientSecret,
|
|
}
|
|
|
|
impl<'de> Deserialize<'de> for FullUserToken {
|
|
fn deserialize<D>(de: D) -> std::result::Result<FullUserToken, D::Error>
|
|
where D: serde::de::Deserializer<'de>, {
|
|
let (refresh, access, login, user_id, client_id, client_secret, scopes): (RefreshToken, AccessToken, _, _, ClientId, ClientSecret, Vec<Scope>) = Deserialize::deserialize(de)?;
|
|
let token = UserToken::from_existing_unchecked(access, refresh, client_id.clone(), client_secret.clone(), login, user_id, Some(scopes), None);
|
|
Ok(FullUserToken {
|
|
token,
|
|
client_id,
|
|
client_secret,
|
|
})
|
|
}
|
|
}
|
|
|
|
impl Serialize for FullUserToken {
|
|
fn serialize<S>(&self, ser: S) -> std::result::Result<S::Ok, S::Error>
|
|
where S: serde::ser::Serializer,
|
|
{
|
|
let to_ser = (&self.token.refresh_token, &self.token.access_token, &self.token.login, &self.token.user_id, &self.client_id, &self.client_secret, self.token.scopes());
|
|
to_ser.serialize(ser)
|
|
}
|
|
}
|
|
|
|
impl Deref for FullUserToken {
|
|
type Target = UserToken;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.token
|
|
}
|
|
}
|
|
|
|
impl DerefMut for FullUserToken {
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
&mut self.token
|
|
}
|
|
}
|
|
|
|
pub fn run_rhai<S: Into<String>>(state: Arc<State>, script: S, fn_args: Vec<rhai::Dynamic>) {
|
|
let script = script.into();
|
|
tokio::task::spawn_blocking(move || {
|
|
let res: anyhow::Result<()> = try {
|
|
let mut scope = rhai::Scope::new();
|
|
let ast = state.script_cache.read().get(&script).map(ToOwned::to_owned);
|
|
let ast = match ast {
|
|
Some(ast) => ast,
|
|
None => {
|
|
let ast = state.rhai.compile(&script)?;
|
|
state.script_cache.write().insert(script.clone(), ast.clone());
|
|
ast
|
|
}
|
|
};
|
|
|
|
let mut output = rhai::Dynamic::from(ExecutorOutput::default());
|
|
|
|
state.rhai.call_fn_raw(
|
|
&mut scope,
|
|
&ast,
|
|
true,
|
|
false,
|
|
"run",
|
|
Some(&mut output),
|
|
fn_args,
|
|
)?;
|
|
|
|
let output: ExecutorOutput = output.cast();
|
|
for message in output.to_send {
|
|
state.irc_queue.send(message).ok();
|
|
}
|
|
};
|
|
|
|
if let Err(e) = res {
|
|
eprintln!("error in rhai script: {:?}", e);
|
|
}
|
|
});
|
|
}
|
|
|
|
async fn verify_token(full_token: &mut Option<FullUserToken>, user_config: &UserConfig, http_client: &reqwest::Client, scopes: Vec<Scope>) -> anyhow::Result<()> {
|
|
let fut = match full_token.take() {
|
|
Some(mut t) => {
|
|
match t.validate_token(http_client).await {
|
|
Ok(validated) => {
|
|
let old = t.token;
|
|
let token = UserToken::from_existing_unchecked(
|
|
old.access_token,
|
|
old.refresh_token,
|
|
validated.client_id,
|
|
t.client_secret.clone(),
|
|
validated.login.unwrap_or(old.login),
|
|
validated.user_id.unwrap_or(old.user_id),
|
|
validated.scopes,
|
|
Some(validated.expires_in),
|
|
);
|
|
|
|
FullUserToken {
|
|
client_id: t.client_id,
|
|
client_secret: t.client_secret,
|
|
token,
|
|
}
|
|
},
|
|
Err(_) => {
|
|
println!("Refreshing token");
|
|
t.refresh_token(http_client).await?;
|
|
t
|
|
},
|
|
}
|
|
},
|
|
None => {
|
|
let mut builder = UserToken::builder(
|
|
user_config.twitch.client_id.clone(),
|
|
user_config.twitch.client_secret.clone(),
|
|
url::Url::parse("http://localhost/").unwrap(),
|
|
).set_scopes(scopes);
|
|
let (url, csrf) = builder.generate_url();
|
|
println!("go to {}", url);
|
|
println!("once done, paste code:");
|
|
let mut code = String::new();
|
|
std::io::stdin().read_line(&mut code)?;
|
|
let token: UserToken = builder.get_user_token(http_client, csrf.as_str(), code.trim()).await?;
|
|
FullUserToken {
|
|
token,
|
|
client_id: user_config.twitch.client_id.clone(),
|
|
client_secret: user_config.twitch.client_secret.clone(),
|
|
}
|
|
}
|
|
};
|
|
|
|
*full_token = Some(fut);
|
|
|
|
Ok(())
|
|
}
|