mas_storage_pg/queue/
schedule.rs1use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use mas_storage::queue::{QueueScheduleRepository, ScheduleStatus};
12use sqlx::PgConnection;
13
14use crate::{DatabaseError, ExecuteExt};
15
16pub struct PgQueueScheduleRepository<'c> {
19 conn: &'c mut PgConnection,
20}
21
22impl<'c> PgQueueScheduleRepository<'c> {
23 #[must_use]
26 pub fn new(conn: &'c mut PgConnection) -> Self {
27 Self { conn }
28 }
29}
30
31struct ScheduleLookup {
32 schedule_name: String,
33 last_scheduled_at: Option<DateTime<Utc>>,
34 last_scheduled_job_completed: Option<bool>,
35}
36
37impl From<ScheduleLookup> for ScheduleStatus {
38 fn from(value: ScheduleLookup) -> Self {
39 ScheduleStatus {
40 schedule_name: value.schedule_name,
41 last_scheduled_at: value.last_scheduled_at,
42 last_scheduled_job_completed: value.last_scheduled_job_completed,
43 }
44 }
45}
46
47#[async_trait]
48impl QueueScheduleRepository for PgQueueScheduleRepository<'_> {
49 type Error = DatabaseError;
50
51 async fn setup(&mut self, schedules: &[&'static str]) -> Result<(), Self::Error> {
52 sqlx::query!(
53 r#"
54 INSERT INTO queue_schedules (schedule_name)
55 SELECT * FROM UNNEST($1::text[]) AS t (schedule_name)
56 ON CONFLICT (schedule_name) DO NOTHING
57 "#,
58 &schedules.iter().map(|&s| s.to_owned()).collect::<Vec<_>>(),
59 )
60 .traced()
61 .execute(&mut *self.conn)
62 .await?;
63
64 Ok(())
65 }
66
67 async fn list(&mut self) -> Result<Vec<ScheduleStatus>, Self::Error> {
68 let res = sqlx::query_as!(
69 ScheduleLookup,
70 r#"
71 SELECT
72 queue_schedules.schedule_name as "schedule_name!",
73 queue_schedules.last_scheduled_at,
74 queue_jobs.status IN ('completed', 'failed') as last_scheduled_job_completed
75 FROM queue_schedules
76 LEFT JOIN queue_jobs
77 ON queue_jobs.queue_job_id = queue_schedules.last_scheduled_job_id
78 "#
79 )
80 .traced()
81 .fetch_all(&mut *self.conn)
82 .await?;
83
84 Ok(res.into_iter().map(Into::into).collect())
85 }
86}