feat: concurrently fetch questions

Add v2 of questions endpoint for pagination.
This commit is contained in:
Anna 2024-01-03 22:51:38 -05:00
parent 4089eca884
commit e47710b7eb
Signed by: anna
GPG Key ID: D0943384CD9F87D1
4 changed files with 283 additions and 37 deletions

30
server/Cargo.lock generated
View File

@ -457,6 +457,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.29"
@ -501,6 +516,17 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa"
[[package]]
name = "futures-macro"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.41",
]
[[package]]
name = "futures-sink"
version = "0.3.29"
@ -519,8 +545,10 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@ -1308,7 +1336,9 @@ dependencies = [
"axum",
"blake3",
"chrono",
"futures",
"mimalloc",
"num_cpus",
"rand",
"serde",
"serde_json",

View File

@ -10,7 +10,9 @@ anyhow = "1"
axum = { version = "0.7", features = ["json"] }
blake3 = { version = "1", features = ["traits-preview"] }
chrono = { version = "0.4", features = ["serde"]}
futures = "0.3"
mimalloc = "0.1"
num_cpus = "1"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
serde_json = "1"

View File

@ -11,8 +11,9 @@ use anyhow::{Result, Context};
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Router, Json};
use axum::extract::{State, Path};
use chrono::{NaiveDate, Utc, Datelike, Duration};
use axum::extract::{State, Path, Query};
use chrono::{NaiveDate, Utc, Datelike, Duration, DateTime};
use futures::stream::{TryStreamExt, FuturesOrdered};
use rand::distributions::{Alphanumeric, DistString};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
@ -128,11 +129,227 @@ async fn delete_user(
Ok(StatusCode::NO_CONTENT)
}
// https://github.com/nox/serde_urlencoded/issues/26
// #[derive(Deserialize)]
// #[serde(tag = "v")]
// enum GetDataQuery {
// #[serde(rename = "1")]
// V1,
// #[serde(rename = "2")]
// V2 {
// #[serde(default)]
// page: Option<u32>,
// #[serde(default = "t")]
// include_current: bool,
// },
// }
// impl Default for GetDataQuery {
// fn default() -> Self {
// Self::V1
// }
// }
#[derive(Deserialize)]
struct GetDataQuery {
v: u32,
#[serde(default)]
page: Option<u32>,
#[serde(default = "t")]
include_current: bool,
}
impl Default for GetDataQuery {
fn default() -> Self {
Self {
v: 1,
page: None,
include_current: false,
}
}
}
const fn t() -> bool {
true
}
#[derive(Serialize)]
#[serde(untagged)]
enum GetDataResult {
V1(Vec<Question>),
V2 {
current: Option<Question>,
page: Vec<Question>,
},
}
const PER_PAGE: usize = 10;
async fn get_data(
state: State<ArcState>,
user: User,
) -> AppResult<Json<Vec<Question>>> {
let questions = sqlx::query!(
query: Option<Query<GetDataQuery>>,
) -> AppResult<Json<GetDataResult>> {
let Query(query) = query.unwrap_or_default();
match query.v {
2 => get_data_v2(
state,
user,
query.page,
query.include_current,
).await.map(Json),
_ => get_data_v1(state, user).await.map(Json),
}
// :c
// match query {
// GetDataQuery::V1 => get_data_v1(state, user).await.map(Json),
// GetDataQuery::V2 {
// page,
// include_current
// } => get_data_v2(
// state,
// user,
// page,
// include_current,
// ).await.map(Json),
// }
}
async fn get_data_v2(
state: State<ArcState>,
user: User,
page: Option<u32>,
include_current: bool,
) -> AppResult<GetDataResult> {
let offset_mul = page.unwrap_or(1).max(1) - 1;
let questions = sqlx::query_as!(
RawQuestion,
// language=postgresql
r#"
select
q.*,
current_timestamp <= q.publish_date + interval '1 day' and current_timestamp > q.publish_date as "active!",
(select answer from responses r where r.question_id = q.id and r.user_id = $1) as response
from questions q
where q.publish_date <= current_timestamp
order by q.publish_date desc
limit $2::bigint + 1 offset $3::bigint * $2::bigint
"#,
user.id,
PER_PAGE as i64,
offset_mul as i64,
)
.fetch_all(&state.pool)
.await?;
let mut current = None;
let questions = questions.into_iter()
.map(|question| {
let state = Arc::clone(&state);
async move {
parse_question(&state, question).await
}
})
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await?;
let questions = questions.into_iter()
.flat_map(|question| {
if question.active {
if include_current {
current = Some(question);
}
None
} else {
Some(question)
}
})
.collect();
if current.is_none() && include_current {
let raw_current = sqlx::query_as!(
RawQuestion,
r#"
select
q.*,
true as "active!",
(select answer from responses r where r.question_id = q.id and r.user_id = $1) as response
from questions q
where
q.publish_date <= current_timestamp
and current_timestamp <= q.publish_date + interval '1 day'
and current_timestamp > q.publish_date
"#,
user.id,
)
.fetch_optional(&state.pool)
.await?;
if let Some(raw) = raw_current {
current = Some(parse_question(&state, raw).await?);
}
}
Ok(GetDataResult::V2 {
current,
page: questions,
})
}
struct RawQuestion {
id: Uuid,
publish_date: DateTime<Utc>,
active: bool,
question_text: String,
answers: Vec<String>,
response: Option<i16>,
}
async fn parse_question(state: &AppState, question: RawQuestion) -> Result<Question> {
let basic = BasicQuestion {
id: question.id,
date: question.publish_date,
active: question.active,
text: question.question_text,
answers: question.answers,
};
if question.response.is_some() || !question.active {
let mut responses: Vec<u64> = vec![0; basic.answers.len()];
let raw_responses = sqlx::query!(
// language=postgresql
"select r.answer, count(r.*) from responses r where r.question_id = $1 group by r.answer",
question.id,
)
.fetch_all(&state.pool)
.await?;
for response in raw_responses {
responses[response.answer as usize] = response.count.map(|c| c as u64).unwrap_or(0);
}
return Ok(FullQuestion {
basic,
responses,
response: question.response.map(|r| r as u16),
}.into());
}
Ok(basic.into())
}
async fn get_data_v1(
state: State<ArcState>,
user: User,
) -> AppResult<GetDataResult> {
let questions = sqlx::query_as!(
RawQuestion,
// language=postgresql
r#"
select
@ -148,41 +365,19 @@ async fn get_data(
.fetch_all(&state.pool)
.await?;
let mut parsed = Vec::with_capacity(questions.len());
for question in questions {
let mut responses: Vec<u64> = vec![0; question.answers.len()];
let raw_responses = sqlx::query!(
// language=postgresql
"select r.answer, count(r.*) from responses r where r.question_id = $1 group by r.answer",
question.id,
)
.fetch_all(&state.pool)
.await?;
let questions = questions.into_iter()
.map(|question| {
let state = Arc::clone(&state);
for response in raw_responses {
responses[response.answer as usize] = response.count.map(|c| c as u64).unwrap_or(0);
}
async move {
parse_question(&state, question).await
}
})
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await?;
let basic = BasicQuestion {
id: question.id,
date: question.publish_date,
active: question.active,
text: question.question_text,
answers: question.answers,
};
if question.response.is_some() || !question.active {
parsed.push(FullQuestion {
basic,
responses,
response: question.response.map(|r| r as u16),
}.into());
} else {
parsed.push(basic.into());
}
}
Ok(Json(parsed))
Ok(GetDataResult::V1(questions))
}
#[derive(Deserialize)]

View File

@ -19,6 +19,14 @@ pub struct FullQuestion {
pub response: Option<u16>,
}
impl std::ops::Deref for FullQuestion {
type Target = BasicQuestion;
fn deref(&self) -> &Self::Target {
&self.basic
}
}
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Question {
@ -26,6 +34,17 @@ pub enum Question {
Full(FullQuestion),
}
impl std::ops::Deref for Question {
type Target = BasicQuestion;
fn deref(&self) -> &Self::Target {
match self {
Self::Basic(basic) => basic,
Self::Full(full) => &full.basic,
}
}
}
impl From<BasicQuestion> for Question {
fn from(q: BasicQuestion) -> Self {
Self::Basic(q)