mas_tasks/
database.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7//! Database-related tasks
8
9use async_trait::async_trait;
10use mas_storage::queue::{CleanupExpiredTokensJob, PruneStalePolicyDataJob};
11use tracing::{debug, info};
12
13use crate::{
14    State,
15    new_queue::{JobContext, JobError, RunnableJob},
16};
17
18#[async_trait]
19impl RunnableJob for CleanupExpiredTokensJob {
20    #[tracing::instrument(name = "job.cleanup_expired_tokens", skip_all, err)]
21    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
22        let clock = state.clock();
23        let mut repo = state.repository().await.map_err(JobError::retry)?;
24
25        let count = repo
26            .oauth2_access_token()
27            .cleanup_revoked(&clock)
28            .await
29            .map_err(JobError::retry)?;
30        repo.save().await.map_err(JobError::retry)?;
31
32        if count == 0 {
33            debug!("no token to clean up");
34        } else {
35            info!(count, "cleaned up revoked tokens");
36        }
37
38        Ok(())
39    }
40}
41
42#[async_trait]
43impl RunnableJob for PruneStalePolicyDataJob {
44    #[tracing::instrument(name = "job.prune_stale_policy_data", skip_all, err)]
45    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
46        let mut repo = state.repository().await.map_err(JobError::retry)?;
47
48        // Keep the last 10 policy data
49        let count = repo
50            .policy_data()
51            .prune(10)
52            .await
53            .map_err(JobError::retry)?;
54
55        repo.save().await.map_err(JobError::retry)?;
56
57        if count == 0 {
58            debug!("no stale policy data to prune");
59        } else {
60            info!(count, "pruned stale policy data");
61        }
62
63        Ok(())
64    }
65}