feat: only update players on login
This commit is contained in:
parent
8cd95f7b48
commit
500687194c
|
@ -3,8 +3,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use chrono::{Duration, Utc};
|
use chrono::{Duration, Utc};
|
||||||
use lodestone_scraper::LodestoneScraper;
|
use log::trace;
|
||||||
use log::{trace, warn};
|
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::{AuthenticateRequest, AuthenticateResponse, ClientState, State, User, util, World, WsStream};
|
use crate::{AuthenticateRequest, AuthenticateResponse, ClientState, State, User, util, World, WsStream};
|
||||||
|
@ -26,40 +25,11 @@ pub async fn authenticate(state: Arc<RwLock<State>>, client_state: Arc<RwLock<Cl
|
||||||
.fetch_optional(&state.read().await.db)
|
.fetch_optional(&state.read().await.db)
|
||||||
.await
|
.await
|
||||||
.context("could not query database for user")?;
|
.context("could not query database for user")?;
|
||||||
let mut user = match user {
|
let user = match user {
|
||||||
Some(u) => u,
|
Some(u) => u,
|
||||||
None => return util::send(conn, number, AuthenticateResponse::error("invalid key")).await,
|
None => return util::send(conn, number, AuthenticateResponse::error("invalid key")).await,
|
||||||
};
|
};
|
||||||
|
|
||||||
if Utc::now().naive_utc().signed_duration_since(user.last_updated) >= Duration::hours(2) {
|
|
||||||
let info = LodestoneScraper::default()
|
|
||||||
.character(user.lodestone_id as u64)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match info {
|
|
||||||
Ok(info) => {
|
|
||||||
let world_name = info.world.as_str();
|
|
||||||
|
|
||||||
user.name = info.name.clone();
|
|
||||||
user.world = world_name.to_string();
|
|
||||||
|
|
||||||
sqlx::query!(
|
|
||||||
// language=sqlite
|
|
||||||
"update users set name = ?, world = ?, last_updated = current_timestamp where lodestone_id = ?",
|
|
||||||
info.name,
|
|
||||||
world_name,
|
|
||||||
user.lodestone_id,
|
|
||||||
)
|
|
||||||
.execute(&state.read().await.db)
|
|
||||||
.await
|
|
||||||
.context("could not update user")?;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("could not get character info during login: {:#?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let world = World::from_str(&user.world).map_err(|_| anyhow::anyhow!("invalid world in db"))?;
|
let world = World::from_str(&user.world).map_err(|_| anyhow::anyhow!("invalid world in db"))?;
|
||||||
|
|
||||||
if let Some(old_client_state) = state.read().await.clients.get(&(user.lodestone_id as u64)) {
|
if let Some(old_client_state) = state.read().await.clients.get(&(user.lodestone_id as u64)) {
|
||||||
|
@ -91,5 +61,9 @@ pub async fn authenticate(state: Arc<RwLock<State>>, client_state: Arc<RwLock<Cl
|
||||||
state.write().await.ids.insert((user.name, util::id_from_world(world)), user.lodestone_id as u64);
|
state.write().await.ids.insert((user.name, util::id_from_world(world)), user.lodestone_id as u64);
|
||||||
trace!(" [authenticate] after state writes");
|
trace!(" [authenticate] after state writes");
|
||||||
|
|
||||||
|
if Utc::now().naive_utc().signed_duration_since(user.last_updated) >= Duration::hours(2) {
|
||||||
|
state.read().await.updater_tx.send(user.lodestone_id).ok();
|
||||||
|
}
|
||||||
|
|
||||||
util::send(conn, number, AuthenticateResponse::success()).await
|
util::send(conn, number, AuthenticateResponse::success()).await
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::{TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
};
|
};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::{Sender, UnboundedSender};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio_tungstenite::{
|
use tokio_tungstenite::{
|
||||||
tungstenite::Message as WsMessage,
|
tungstenite::Message as WsMessage,
|
||||||
|
@ -59,6 +59,7 @@ pub struct State {
|
||||||
pub ids: HashMap<(String, u16), u64>,
|
pub ids: HashMap<(String, u16), u64>,
|
||||||
pub secrets_requests: HashMap<Uuid, SecretsRequestInfo>,
|
pub secrets_requests: HashMap<Uuid, SecretsRequestInfo>,
|
||||||
pub messages_sent: AtomicU64,
|
pub messages_sent: AtomicU64,
|
||||||
|
pub updater_tx: UnboundedSender<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
|
@ -126,6 +127,9 @@ async fn main() -> Result<()> {
|
||||||
.await
|
.await
|
||||||
.context("could not run database migrations")?;
|
.context("could not run database migrations")?;
|
||||||
|
|
||||||
|
// set up updater channel
|
||||||
|
let (updater_tx, updater_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
|
||||||
// set up server
|
// set up server
|
||||||
let server = TcpListener::bind(&config.server.address).await?;
|
let server = TcpListener::bind(&config.server.address).await?;
|
||||||
let state = Arc::new(RwLock::new(State {
|
let state = Arc::new(RwLock::new(State {
|
||||||
|
@ -134,6 +138,7 @@ async fn main() -> Result<()> {
|
||||||
ids: Default::default(),
|
ids: Default::default(),
|
||||||
secrets_requests: Default::default(),
|
secrets_requests: Default::default(),
|
||||||
messages_sent: AtomicU64::default(),
|
messages_sent: AtomicU64::default(),
|
||||||
|
updater_tx,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
info!("Listening on ws://{}/", server.local_addr()?);
|
info!("Listening on ws://{}/", server.local_addr()?);
|
||||||
|
@ -211,7 +216,7 @@ async fn main() -> Result<()> {
|
||||||
|
|
||||||
influx::spawn(&config, Arc::clone(&state));
|
influx::spawn(&config, Arc::clone(&state));
|
||||||
|
|
||||||
updater::spawn(Arc::clone(&state));
|
updater::spawn(Arc::clone(&state), updater_rx);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let res: Result<()> = try {
|
let res: Result<()> = try {
|
||||||
|
|
|
@ -1,61 +1,47 @@
|
||||||
use std::collections::HashMap;
|
use std::{
|
||||||
use std::sync::Arc;
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use lodestone_scraper::LodestoneScraper;
|
use lodestone_scraper::LodestoneScraper;
|
||||||
use log::{debug, error, info, trace};
|
use log::{debug, error, trace};
|
||||||
use tokio::sync::RwLock;
|
use tokio::{
|
||||||
use tokio::task::JoinHandle;
|
sync::{
|
||||||
|
mpsc::UnboundedReceiver,
|
||||||
|
RwLock,
|
||||||
|
},
|
||||||
|
task::JoinHandle,
|
||||||
|
time::Instant,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::State;
|
use crate::State;
|
||||||
|
|
||||||
pub fn spawn(state: Arc<RwLock<State>>) -> JoinHandle<()> {
|
pub fn spawn(state: Arc<RwLock<State>>, mut rx: UnboundedReceiver<i64>) -> JoinHandle<()> {
|
||||||
|
const WAIT_TIME: u64 = 5;
|
||||||
|
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
let lodestone = LodestoneScraper::default();
|
let lodestone = LodestoneScraper::default();
|
||||||
|
|
||||||
loop {
|
let mut last_update = Instant::now();
|
||||||
match inner(&state, &lodestone).await {
|
while let Some(id) = rx.recv().await {
|
||||||
Ok(results) => {
|
// make sure to wait five seconds between each request
|
||||||
let successful = results.values().filter(|result| result.is_ok()).count();
|
let elapsed = last_update.elapsed();
|
||||||
info!("Updated {}/{} characters", successful, results.len());
|
if elapsed < Duration::from_secs(WAIT_TIME) {
|
||||||
for (id, result) in results {
|
let left = Duration::from_secs(WAIT_TIME) - elapsed;
|
||||||
if let Err(e) = result {
|
tokio::time::sleep(left).await;
|
||||||
error!("error updating user {}: {:?}", id, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("error updating users: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
|
match update(&*state, &lodestone, id).await {
|
||||||
|
Ok(()) => debug!("updated user {}", id),
|
||||||
|
Err(e) => error!("error updating user {}: {:?}", id, e),
|
||||||
|
}
|
||||||
|
|
||||||
|
last_update = Instant::now();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn inner(state: &RwLock<State>, lodestone: &LodestoneScraper) -> Result<HashMap<u32, Result<()>>> {
|
|
||||||
let users = sqlx::query!(
|
|
||||||
// language=sqlite
|
|
||||||
"select * from users where (julianday(current_timestamp) - julianday(last_updated)) * 24 >= 2 order by last_updated",
|
|
||||||
)
|
|
||||||
.fetch_all(&state.read().await.db)
|
|
||||||
.await
|
|
||||||
.context("could not query database for users")?;
|
|
||||||
|
|
||||||
let mut results = HashMap::with_capacity(users.len());
|
|
||||||
for (i, user) in users.iter().enumerate() {
|
|
||||||
results.insert(user.lodestone_id as u32, update(state, lodestone, user.lodestone_id).await);
|
|
||||||
if i % 5 == 0 {
|
|
||||||
debug!("updated {}/{} users", i, users.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(results)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update(state: &RwLock<State>, lodestone: &LodestoneScraper, lodestone_id: i64) -> Result<()> {
|
async fn update(state: &RwLock<State>, lodestone: &LodestoneScraper, lodestone_id: i64) -> Result<()> {
|
||||||
let info = lodestone
|
let info = lodestone
|
||||||
.character(lodestone_id as u64)
|
.character(lodestone_id as u64)
|
||||||
|
@ -87,4 +73,4 @@ async fn update(state: &RwLock<State>, lodestone: &LodestoneScraper, lodestone_i
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
Loading…
Reference in New Issue