1use async_trait::async_trait;
10use mas_storage::queue::{CleanupExpiredTokensJob, PruneStalePolicyDataJob};
11use tracing::{debug, info};
12
13use crate::{
14 State,
15 new_queue::{JobContext, JobError, RunnableJob},
16};
17
18#[async_trait]
19impl RunnableJob for CleanupExpiredTokensJob {
20 #[tracing::instrument(name = "job.cleanup_expired_tokens", skip_all, err)]
21 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
22 let clock = state.clock();
23 let mut repo = state.repository().await.map_err(JobError::retry)?;
24
25 let count = repo
26 .oauth2_access_token()
27 .cleanup_revoked(&clock)
28 .await
29 .map_err(JobError::retry)?;
30 repo.save().await.map_err(JobError::retry)?;
31
32 if count == 0 {
33 debug!("no token to clean up");
34 } else {
35 info!(count, "cleaned up revoked tokens");
36 }
37
38 Ok(())
39 }
40}
41
42#[async_trait]
43impl RunnableJob for PruneStalePolicyDataJob {
44 #[tracing::instrument(name = "job.prune_stale_policy_data", skip_all, err)]
45 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
46 let mut repo = state.repository().await.map_err(JobError::retry)?;
47
48 let count = repo
50 .policy_data()
51 .prune(10)
52 .await
53 .map_err(JobError::retry)?;
54
55 repo.save().await.map_err(JobError::retry)?;
56
57 if count == 0 {
58 debug!("no stale policy data to prune");
59 } else {
60 info!(count, "pruned stale policy data");
61 }
62
63 Ok(())
64 }
65}