mas_storage_pg/queue/
schedule.rs

1// Copyright 2024 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6//! A module containing the PostgreSQL implementation of the
7//! [`QueueScheduleRepository`].
8
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use mas_storage::queue::{QueueScheduleRepository, ScheduleStatus};
12use sqlx::PgConnection;
13
14use crate::{DatabaseError, ExecuteExt};
15
16/// An implementation of [`QueueScheduleRepository`] for a PostgreSQL
17/// connection.
18pub struct PgQueueScheduleRepository<'c> {
19    conn: &'c mut PgConnection,
20}
21
22impl<'c> PgQueueScheduleRepository<'c> {
23    /// Create a new [`PgQueueScheduleRepository`] from an active PostgreSQL
24    /// connection.
25    #[must_use]
26    pub fn new(conn: &'c mut PgConnection) -> Self {
27        Self { conn }
28    }
29}
30
31struct ScheduleLookup {
32    schedule_name: String,
33    last_scheduled_at: Option<DateTime<Utc>>,
34    last_scheduled_job_completed: Option<bool>,
35}
36
37impl From<ScheduleLookup> for ScheduleStatus {
38    fn from(value: ScheduleLookup) -> Self {
39        ScheduleStatus {
40            schedule_name: value.schedule_name,
41            last_scheduled_at: value.last_scheduled_at,
42            last_scheduled_job_completed: value.last_scheduled_job_completed,
43        }
44    }
45}
46
47#[async_trait]
48impl QueueScheduleRepository for PgQueueScheduleRepository<'_> {
49    type Error = DatabaseError;
50
51    async fn setup(&mut self, schedules: &[&'static str]) -> Result<(), Self::Error> {
52        sqlx::query!(
53            r#"
54                INSERT INTO queue_schedules (schedule_name)
55                SELECT * FROM UNNEST($1::text[]) AS t (schedule_name)
56                ON CONFLICT (schedule_name) DO NOTHING
57            "#,
58            &schedules.iter().map(|&s| s.to_owned()).collect::<Vec<_>>(),
59        )
60        .traced()
61        .execute(&mut *self.conn)
62        .await?;
63
64        Ok(())
65    }
66
67    async fn list(&mut self) -> Result<Vec<ScheduleStatus>, Self::Error> {
68        let res = sqlx::query_as!(
69            ScheduleLookup,
70            r#"
71                SELECT
72                    queue_schedules.schedule_name as "schedule_name!",
73                    queue_schedules.last_scheduled_at,
74                    queue_jobs.status IN ('completed', 'failed') as last_scheduled_job_completed
75                FROM queue_schedules
76                LEFT JOIN queue_jobs
77                    ON queue_jobs.queue_job_id = queue_schedules.last_scheduled_job_id
78            "#
79        )
80        .traced()
81        .fetch_all(&mut *self.conn)
82        .await?;
83
84        Ok(res.into_iter().map(Into::into).collect())
85    }
86}