feat: add stream status events

Add IRC reconnection. Add token refreshing.
This commit is contained in:
Anna 2021-08-31 15:45:22 -04:00
parent 57d54f3bb5
commit de73190092
16 changed files with 362 additions and 57 deletions

View File

@ -22,7 +22,7 @@ serde_json = "1"
serde_with = { version = "1", features = ["chrono"] }
toml = "0.5"
tokio-tungstenite = { version = "0.15", features = ["rustls-tls"] }
twitch_api2 = { version = "0.6.0-rc.2", features = ["twitch_oauth2", "client", "reqwest_client", "helix", "pubsub"] }
twitch_api2 = { version = "0.6.0-rc.2", features = ["twitch_oauth2", "client", "reqwest_client", "helix", "pubsub", "unsupported"] }
url = "2"
uuid = { version = "0.8", features = ["v4", "serde"] }
warp = "0.3"

View File

@ -1,14 +1,16 @@
pub mod config;
pub mod duration_tools;
pub mod rhai_tools;
pub mod task;
pub mod twitch;
pub mod twitch_irc;
pub mod user_config;
pub mod web;
use anyhow::Result;
use cached::{TimedCache, Cached};
use futures::{StreamExt, SinkExt};
use irc::client::prelude::{Client as IrcClient, Config as IrcConfig, Capability};
use irc::client::prelude::Client as IrcClient;
use rhai::Engine;
use serde::{Deserialize, Serialize};
use tokio::{
@ -38,22 +40,27 @@ use crate::app::{
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::Arc,
sync::{
Arc,
atomic::AtomicBool,
},
time::Instant,
};
use std::sync::atomic::Ordering;
pub struct State {
pub user_config: UserConfig,
pub config: RwLock<Config>,
pub config: Arc<RwLock<Config>>,
pub channel_name: String,
pub twitch: Arc<Twitch>,
pub live: AtomicBool,
pub rewards_paused: RwLock<HashMap<String, bool>>,
pub user_login_cache: Mutex<TimedCache<UserName, User>>,
pub user_id_cache: Mutex<TimedCache<UserId, User>>,
pub irc: IrcClient,
pub irc: RwLock<IrcClient>,
pub irc_queue: UnboundedSender<String>,
pub user_cooldowns: RwLock<HashMap<(UserId, String), Instant>>,
pub global_cooldowns: RwLock<HashMap<String, Instant>>,
@ -91,34 +98,20 @@ impl State {
]).await?;
println!("User token ready");
let config = Arc::new(RwLock::new(config));
let twitch_client = TwitchClient::with_client(http_client);
let twitch = Arc::new(Twitch {
client: twitch_client,
bot_token: config.bot_token.clone().unwrap().token,
user_token: config.user_token.clone().unwrap().token,
});
let twitch_config = Arc::clone(&config);
let twitch = Arc::new(Twitch::new(twitch_client, twitch_config));
let user_id = UserId::new(user_config.twitch.channel_id.to_string());
let channel_name = twitch.client.helix.get_user_from_id(user_id, &twitch.bot_token)
let channel_name = twitch.client.helix.get_user_from_id(user_id, &twitch.bot_token().await?)
.await?
.ok_or_else(|| anyhow::anyhow!("no channel for id {}", user_config.twitch.channel_id))?
.login
.to_string();
let irc_config = IrcConfig {
server: Some("irc.chat.twitch.tv".into()),
port: Some(6697),
username: Some(twitch.bot_token.login.clone()),
nickname: Some(twitch.bot_token.login.clone()),
password: Some(format!("oauth:{}", twitch.bot_token.access_token.secret())),
..Default::default()
};
let mut irc = IrcClient::from_config(irc_config).await?;
irc.send_cap_req(&[
Capability::Custom("twitch.tv/tags"),
Capability::Custom("twitch.tv/commands"),
])?;
irc.identify()?;
let mut irc = self::twitch_irc::connect(&twitch).await?;
let mut irc_stream = irc.stream()?;
let mut rhai = Engine::new();
@ -143,16 +136,17 @@ impl State {
let state = Arc::new(Self {
user_config,
config: RwLock::new(config),
config,
channel_name,
twitch,
live: AtomicBool::new(false),
rewards_paused: Default::default(),
user_login_cache: Mutex::new(TimedCache::with_lifespan(3600)),
user_id_cache: Mutex::new(TimedCache::with_lifespan(3600)),
irc,
irc: RwLock::new(irc),
irc_queue: queue_tx,
user_cooldowns: Default::default(),
global_cooldowns: Default::default(),
@ -171,6 +165,10 @@ impl State {
let redemption_topic = twitch_api2::pubsub::channel_points::ChannelPointsChannelV1 {
channel_id: state.user_config.twitch.channel_id as u32,
}.into_topic();
let video_playback_topic = twitch_api2::pubsub::video_playback::VideoPlaybackById {
channel_id: state.user_config.twitch.channel_id as u32,
}.into_topic();
let auth_token: Option<String> = state.config
.read()
.await
@ -178,7 +176,7 @@ impl State {
.as_ref()
.map(|token| token.access_token.secret().to_string());
let listen_command = twitch_api2::pubsub::listen_command(
&[redemption_topic],
&[redemption_topic, video_playback_topic],
auth_token.as_deref(),
"1",
)?;
@ -216,14 +214,33 @@ impl State {
let task_state = Arc::clone(&state);
tokio::task::spawn(async move {
// FIXME: handle reconnects
while let Some(event) = irc_stream.next().await.transpose()? {
let task_state = Arc::clone(&task_state);
if let Err(e) = crate::app::twitch::handle_irc_event(task_state, event).await {
eprintln!("irc error: {:?}", e);
loop {
while let Some(event) = irc_stream.next().await.transpose().unwrap_or_default() {
let task_state = Arc::clone(&task_state);
if let Err(e) = crate::app::twitch::handle_irc_event(task_state, event).await {
eprintln!("irc error: {:?}", e);
}
}
}
Result::<(), anyhow::Error>::Ok(())
// reconnect
eprintln!("reconnecting to irc... ");
match self::twitch_irc::connect(&*task_state.twitch).await {
Ok(mut client) => {
eprintln!("connected to irc");
if let Ok(stream) = client.stream() {
eprintln!("connected but no stream?");
irc_stream = stream;
}
*task_state.irc.write().await = client;
}
Err(e) => {
eprintln!("error reconnecting to irc: {:?}", e);
}
}
}
});
// start irc message queue
@ -232,12 +249,16 @@ impl State {
let channel = format!("#{}", task_state.channel_name);
while let Some(message) = queue_rx.recv().await {
if let Err(e) = task_state.irc.send_privmsg(&channel, message) {
if let Err(e) = task_state.irc.read().await.send_privmsg(&channel, message) {
eprintln!("error sending message: {:?}", e);
}
}
});
// start stream status task
let task_state = Arc::clone(&state);
self::task::stream_status::start_task(task_state).await;
Ok(state)
}
@ -247,7 +268,7 @@ impl State {
return Ok(Some(user.clone()));
}
let user = self.twitch.client.helix.get_user_from_id(id, &self.twitch.bot_token).await?;
let user = self.twitch.client.helix.get_user_from_id(id, &self.twitch.bot_token().await?).await?;
self.add_user_to_cache(&user).await;
Ok(user)
@ -259,7 +280,7 @@ impl State {
return Ok(Some(user.clone()));
}
let user = self.twitch.client.helix.get_user_from_login(login, &self.twitch.bot_token).await?;
let user = self.twitch.client.helix.get_user_from_login(login, &self.twitch.bot_token().await?).await?;
self.add_user_to_cache(&user).await;
Ok(user)
@ -271,6 +292,17 @@ impl State {
self.user_login_cache.lock().await.cache_set(user.login.clone(), user.clone());
}
}
async fn store_live_status(self: &Arc<State>, live: bool) {
let prev = self.live.swap(live, Ordering::SeqCst);
if prev == live {
return;
}
if let Some(rhai) = &self.config.read().await.events.stream_status {
self::run_rhai(Arc::clone(self), rhai, vec![rhai::Dynamic::from(live)]);
}
}
}
#[derive(Clone)]
@ -316,7 +348,7 @@ impl DerefMut for FullUserToken {
}
}
pub fn run_rhai<S: Into<String>>(state: Arc<State>, script: S, fn_state: ExecutorState) {
pub fn run_rhai<S: Into<String>>(state: Arc<State>, script: S, fn_args: Vec<rhai::Dynamic>) {
let script = script.into();
tokio::task::spawn_blocking(move || {
let res: anyhow::Result<()> = try {
@ -339,7 +371,7 @@ pub fn run_rhai<S: Into<String>>(state: Arc<State>, script: S, fn_state: Executo
true,
"run",
Some(&mut output),
[rhai::Dynamic::from(fn_state)],
fn_args,
)?;
let output: ExecutorOutput = output.cast();
@ -355,10 +387,34 @@ pub fn run_rhai<S: Into<String>>(state: Arc<State>, script: S, fn_state: Executo
}
async fn verify_token(full_token: &mut Option<FullUserToken>, user_config: &UserConfig, http_client: &reqwest::Client, scopes: Vec<Scope>) -> anyhow::Result<()> {
match full_token {
Some(t) => if t.validate_token(http_client).await.is_err() {
println!("Refreshing token");
t.refresh_token(http_client).await?;
let fut = match full_token.take() {
Some(mut t) => {
match t.validate_token(http_client).await {
Ok(validated) => {
let old = t.token;
let token = UserToken::from_existing_unchecked(
old.access_token,
old.refresh_token,
validated.client_id,
t.client_secret.clone(),
validated.login.unwrap_or(old.login),
validated.user_id.unwrap_or(old.user_id),
validated.scopes,
Some(validated.expires_in),
);
FullUserToken {
client_id: t.client_id,
client_secret: t.client_secret,
token,
}
},
Err(_) => {
println!("Refreshing token");
t.refresh_token(http_client).await?;
t
},
}
},
None => {
let mut builder = UserToken::builder(
@ -372,13 +428,15 @@ async fn verify_token(full_token: &mut Option<FullUserToken>, user_config: &User
let mut code = String::new();
std::io::stdin().read_line(&mut code)?;
let token: UserToken = builder.get_user_token(http_client, csrf.as_str(), code.trim()).await?;
*full_token = Some(FullUserToken {
FullUserToken {
token,
client_id: user_config.twitch.client_id.clone(),
client_secret: user_config.twitch.client_secret.clone(),
});
}
}
};
*full_token = Some(fut);
Ok(())
}

View File

@ -12,6 +12,8 @@ pub struct Config {
pub user_token: Option<FullUserToken>,
pub commands: Vec<Command>,
pub redemptions: Vec<Redemption>,
#[serde(default)]
pub events: Events,
}
#[derive(Deserialize, Serialize, Clone)]
@ -47,3 +49,8 @@ pub struct Redemption {
pub twitch_id: twitch_api2::types::RewardId,
pub rhai: String,
}
#[derive(Deserialize, Serialize, Default, Clone)]
pub struct Events {
pub stream_status: Option<String>,
}

View File

@ -103,7 +103,7 @@ impl ExecutorState {
let req = GetChannelInformationRequest::builder()
.broadcaster_id(id.into())
.build();
self.state.twitch.client.helix.req_get(req, &self.state.twitch.bot_token)
self.state.twitch.client.helix.req_get(req, &self.state.twitch.bot_token().await.ok()?)
.await
.ok()?
.data

1
src/app/task/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod stream_status;

View File

@ -0,0 +1,40 @@
use chrono::Duration;
use twitch_api2::{
types::UserId,
helix::streams::{
GetStreamsRequest,
get_streams::Stream,
},
};
use crate::app::State;
use std::sync::Arc;
pub async fn start_task(state: Arc<State>) {
tokio::task::spawn(async move {
loop {
let req = GetStreamsRequest::builder()
.user_id(vec![
UserId::new(state.user_config.twitch.channel_id.to_string()),
])
.build();
let token = match state.twitch.bot_token().await {
Ok(token) => token,
Err(e) => {
eprintln!("could not get bot token: {:?}", e);
continue;
},
};
match state.twitch.client.helix.req_get(req, &token).await {
Ok(resp) => {
let streams: Vec<Stream> = resp.data;
state.store_live_status(!streams.is_empty()).await;
}
Err(e) => {
eprintln!("could not get stream status: {:?}", e);
}
}
tokio::time::sleep(Duration::minutes(5).to_std().unwrap()).await;
}
});
}

View File

@ -7,6 +7,7 @@ use irc::{
client::prelude::Message,
proto::{Command, Response},
};
use tokio::sync::RwLock;
use tokio_tungstenite::tungstenite::Message as WsMessage;
use crate::app::{
State,
@ -17,8 +18,36 @@ use std::sync::Arc;
pub struct Twitch {
pub client: TwitchClient<'static, reqwest::Client>,
pub bot_token: UserToken,
pub user_token: UserToken,
config: Arc<RwLock<Config>>,
}
impl Twitch {
pub fn new(client: TwitchClient<'static, reqwest::Client>, config: Arc<RwLock<Config>>) -> Self {
Self {
client,
config,
}
}
pub async fn bot_token(&self) -> anyhow::Result<UserToken> {
let mut bot_token = self.config.read().await.bot_token.as_ref().unwrap().clone();
if bot_token.token.expires_in() <= std::time::Duration::from_secs(60 * 15) {
bot_token.refresh_token(&reqwest::Client::new()).await?;
self.config.write().await.bot_token.replace(bot_token.clone());
}
Ok(bot_token.token)
}
pub async fn user_token(&self) -> anyhow::Result<UserToken> {
let mut user_token = self.config.read().await.user_token.as_ref().unwrap().clone();
if user_token.token.expires_in() <= std::time::Duration::from_secs(60 * 15) {
user_token.refresh_token(&reqwest::Client::new()).await?;
self.config.write().await.bot_token.replace(user_token.clone());
}
Ok(user_token.token)
}
}
pub async fn handle_irc_event(state: Arc<State>, event: Message) -> anyhow::Result<()> {
@ -26,7 +55,7 @@ pub async fn handle_irc_event(state: Arc<State>, event: Message) -> anyhow::Resu
match &event.command {
Command::Response(resp, _) if *resp == Response::RPL_WELCOME => {
state.irc.send_join(&channel_name)?;
state.irc.read().await.send_join(&channel_name)?;
}
// FIXME: do correct checking here
Command::PRIVMSG(channel, message) if *channel == channel_name => {
@ -120,7 +149,8 @@ async fn on_privmsg(state: Arc<State>, event: &Message, message: &str) -> anyhow
speedgame_name,
speedgame_category,
};
crate::app::run_rhai(Arc::clone(&state), t, fn_state);
let args = vec![rhai::Dynamic::from(fn_state)];
crate::app::run_rhai(Arc::clone(&state), t, args);
}
}
@ -133,6 +163,9 @@ use twitch_api2::pubsub::{
channel_points::ChannelPointsChannelV1Reply,
};
use std::time::Instant;
use twitch_api2::pubsub::video_playback::VideoPlaybackReply;
use crate::app::config::Config;
use twitch_api2::twitch_oauth2::TwitchToken;
pub async fn handle_pubsub(state: Arc<State>, event: WsMessage) -> anyhow::Result<()> {
let json = match event {
@ -142,11 +175,17 @@ pub async fn handle_pubsub(state: Arc<State>, event: WsMessage) -> anyhow::Resul
let response = twitch_api2::pubsub::Response::parse(&json)?;
let reply = match response {
PubSubResponse::Message { data: TopicData::ChannelPointsChannelV1 { reply, .. } } => reply,
_ => return Ok(()),
};
match response {
PubSubResponse::Message {data} => match data {
TopicData::ChannelPointsChannelV1 { reply, .. } => handle_channel_points_reply(state, reply).await,
TopicData::VideoPlaybackById { reply, .. } => handle_stream_status(state, reply).await,
_ => Ok(()),
},
_ => Ok(()),
}
}
async fn handle_channel_points_reply(state: Arc<State>, reply: Box<ChannelPointsChannelV1Reply>) -> anyhow::Result<()> {
let redemption = match *reply {
ChannelPointsChannelV1Reply::RewardRedeemed { redemption, .. } => redemption,
_ => return Ok(()),
@ -174,7 +213,19 @@ pub async fn handle_pubsub(state: Arc<State>, event: WsMessage) -> anyhow::Resul
speedgame_name,
speedgame_category,
};
crate::app::run_rhai(Arc::clone(&state), &action.rhai, fn_state);
let args = vec![rhai::Dynamic::from(fn_state)];
crate::app::run_rhai(Arc::clone(&state), &action.rhai, args);
Ok(())
}
async fn handle_stream_status(state: Arc<State>, reply: Box<VideoPlaybackReply>) -> anyhow::Result<()> {
let live = match *reply {
VideoPlaybackReply::StreamUp { .. } => true,
VideoPlaybackReply::StreamDown { .. } => false,
_ => return Ok(()),
};
state.store_live_status(live).await;
Ok(())
}

22
src/app/twitch_irc.rs Normal file
View File

@ -0,0 +1,22 @@
use anyhow::Result;
use irc::client::prelude::{Capability, Client as IrcClient, Config as IrcConfig};
use crate::app::twitch::Twitch;
pub async fn connect(twitch: &Twitch) -> Result<IrcClient> {
let token = twitch.bot_token().await?;
let irc_config = IrcConfig {
server: Some("irc.chat.twitch.tv".into()),
port: Some(6697),
username: Some(token.login.clone()),
nickname: Some(token.login.clone()),
password: Some(format!("oauth:{}", token.access_token.secret())),
..Default::default()
};
let irc = IrcClient::from_config(irc_config).await?;
irc.send_cap_req(&[
Capability::Custom("twitch.tv/tags"),
Capability::Custom("twitch.tv/commands"),
])?;
irc.identify()?;
Ok(irc)
}

View File

@ -33,6 +33,7 @@ pub async fn start_web(state: Arc<State>) {
commands_routes(Arc::clone(&state))
.or(redemptions_routes(Arc::clone(&state)))
.or(livesplit_routes(Arc::clone(&state)))
.or(events_routes(Arc::clone(&state)))
);
let unauthed = access_token_routes();

View File

@ -0,0 +1,67 @@
use warp::{Filter, Reply, filters::BoxedFilter, http::Uri};
use crate::app::{
State,
web::{
CustomRejection,
template::events::EventsTemplate,
},
};
use std::{
collections::HashMap,
convert::Infallible,
sync::Arc,
};
pub fn events_routes(state: Arc<State>) -> BoxedFilter<(impl Reply, )> {
warp::get()
.and(
events_get(Arc::clone(&state))
)
.or(warp::post().and(
events_edit_post(Arc::clone(&state))
))
.boxed()
}
fn events_get(state: Arc<State>) -> BoxedFilter<(impl Reply, )> {
warp::path("events")
.and(warp::path::end())
.and_then(move || {
let state = Arc::clone(&state);
async move {
Result::<EventsTemplate, Infallible>::Ok(EventsTemplate {
events: state.config.read().await.events.clone(),
})
}
})
.boxed()
}
fn events_edit_post(state: Arc<State>) -> BoxedFilter<(impl Reply, )> {
warp::path("events")
.and(warp::path::end())
.and(warp::body::content_length_limit(1024 * 5))
.and(warp::body::form())
.and_then(move |mut form: HashMap<String, String>| {
let state = Arc::clone(&state);
async move {
let event_name = match form.remove("event") {
Some(name) => name,
None => return Err(warp::reject::custom(CustomRejection::InvalidForm)),
};
let script = match form.remove("script") {
Some(script) => script,
None => return Err(warp::reject::custom(CustomRejection::InvalidForm)),
};
match &*event_name {
"stream_status" => state.config.write().await.events.stream_status = Some(script),
_ => return Err(warp::reject::custom(CustomRejection::InvalidForm)),
}
Ok(warp::redirect(Uri::from_static("/events")))
}
})
.boxed()
}

View File

@ -53,7 +53,7 @@ async fn set_reward_paused(state: Arc<State>, id: String, paused: bool) -> anyho
.is_paused(paused)
.build();
state.twitch.client.helix.req_patch(request, body, &state.twitch.user_token).await?;
state.twitch.client.helix.req_patch(request, body, &state.twitch.user_token().await?).await?;
Ok(())
}

View File

@ -1,10 +1,12 @@
pub mod access_token;
pub mod commands;
pub mod events;
pub mod livesplit;
pub mod redemptions;
pub use self::access_token::*;
pub use self::commands::*;
pub use self::events::*;
pub use self::livesplit::*;
pub use self::redemptions::*;

View File

@ -126,7 +126,11 @@ fn redemptions_list_get(state: Arc<State>) -> BoxedFilter<(impl Reply, )> {
let req = GetCustomRewardRequest::builder()
.broadcaster_id(state.user_config.twitch.channel_id.to_string())
.build();
let rewards: Vec<CustomReward> = match state.twitch.client.helix.req_get(req, &state.twitch.user_token).await {
let token = match state.twitch.user_token().await {
Ok(token) => token,
Err(_) => return Err(warp::reject::custom(CustomRejection::TwitchError)),
};
let rewards: Vec<CustomReward> = match state.twitch.client.helix.req_get(req, &token).await {
Ok(resp) => resp.data,
Err(_) => return Err(warp::reject::custom(CustomRejection::TwitchError)),
};

View File

@ -0,0 +1,8 @@
use askama::Template;
use crate::app::config::Events;
#[derive(Template)]
#[template(path = "events.html")]
pub struct EventsTemplate {
pub events: Events,
}

View File

@ -1,3 +1,4 @@
pub mod commands;
pub mod events;
pub mod index;
pub mod redemptions;

43
templates/events.html Normal file
View File

@ -0,0 +1,43 @@
{% extends "_base.html" %}
{% block title %}Events{% endblock %}
{% block head %}
<style>
body {
display: flex;
flex-direction: column;
align-items: flex-start;
}
.event {
display: flex;
flex-direction: column;
align-items: flex-start;
margin-bottom: 1em;
max-width: 100%;
}
</style>
{% endblock %}
{% block body %}
<ul class="breadcrumbs">
<li><a href="/">Home</a></li>
<li class="current"><a href="/events">Events</a></li>
</ul>
<div class="event">
<strong>Stream status change</strong>
<form action="/events" method="post">
<input type="hidden" name="event" value="stream_status"/>
<textarea name="script">
{%- match events.stream_status -%}
{%- when Some with (script) -%}
{{- script -}}
{%- else -%}
{%- endmatch -%}
</textarea>
<button type="submit">Edit</button>
</form>
</div>
{% endblock %}