mas_tasks/
sessions.rs

1// Copyright 2025 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6use std::collections::HashSet;
7
8use async_trait::async_trait;
9use chrono::Duration;
10use mas_storage::{
11    compat::CompatSessionFilter,
12    oauth2::OAuth2SessionFilter,
13    queue::{
14        ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, ExpireInactiveSessionsJob,
15        ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob,
16    },
17    user::BrowserSessionFilter,
18};
19
20use crate::{
21    State,
22    new_queue::{JobContext, JobError, RunnableJob},
23};
24
25#[async_trait]
26impl RunnableJob for ExpireInactiveSessionsJob {
27    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
28        let Some(config) = state.site_config().session_expiration.as_ref() else {
29            // Automatic session expiration is disabled
30            return Ok(());
31        };
32
33        let clock = state.clock();
34        let mut rng = state.rng();
35        let now = clock.now();
36        let mut repo = state.repository().await.map_err(JobError::retry)?;
37
38        if let Some(ttl) = config.oauth_session_inactivity_ttl {
39            repo.queue_job()
40                .schedule_job(
41                    &mut rng,
42                    &clock,
43                    ExpireInactiveOAuthSessionsJob::new(now - ttl),
44                )
45                .await
46                .map_err(JobError::retry)?;
47        }
48
49        if let Some(ttl) = config.compat_session_inactivity_ttl {
50            repo.queue_job()
51                .schedule_job(
52                    &mut rng,
53                    &clock,
54                    ExpireInactiveCompatSessionsJob::new(now - ttl),
55                )
56                .await
57                .map_err(JobError::retry)?;
58        }
59
60        if let Some(ttl) = config.user_session_inactivity_ttl {
61            repo.queue_job()
62                .schedule_job(
63                    &mut rng,
64                    &clock,
65                    ExpireInactiveUserSessionsJob::new(now - ttl),
66                )
67                .await
68                .map_err(JobError::retry)?;
69        }
70
71        repo.save().await.map_err(JobError::retry)?;
72
73        Ok(())
74    }
75}
76
77#[async_trait]
78impl RunnableJob for ExpireInactiveOAuthSessionsJob {
79    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
80        let mut repo = state.repository().await.map_err(JobError::retry)?;
81        let clock = state.clock();
82        let mut rng = state.rng();
83        let mut users_synced = HashSet::new();
84
85        // This delay is used to space out the device sync jobs
86        // We add 10 seconds between each device sync, meaning that it will spread out
87        // the syncs over ~16 minutes max if we get a full batch of 100 users
88        let mut delay = Duration::minutes(1);
89
90        let filter = OAuth2SessionFilter::new()
91            .with_last_active_before(self.threshold())
92            .for_any_user()
93            .only_dynamic_clients()
94            .active_only();
95
96        let pagination = self.pagination(100);
97
98        let page = repo
99            .oauth2_session()
100            .list(filter, pagination)
101            .await
102            .map_err(JobError::retry)?;
103
104        if let Some(job) = self.next(&page) {
105            tracing::info!("Scheduling job to expire the next batch of inactive sessions");
106            repo.queue_job()
107                .schedule_job(&mut rng, &clock, job)
108                .await
109                .map_err(JobError::retry)?;
110        }
111
112        for edge in page.edges {
113            if let Some(user_id) = edge.user_id {
114                let inserted = users_synced.insert(user_id);
115                if inserted {
116                    tracing::info!(user.id = %user_id, "Scheduling devices sync for user");
117                    repo.queue_job()
118                        .schedule_job_later(
119                            &mut rng,
120                            &clock,
121                            SyncDevicesJob::new_for_id(user_id),
122                            clock.now() + delay,
123                        )
124                        .await
125                        .map_err(JobError::retry)?;
126                    delay += Duration::seconds(10);
127                }
128            }
129
130            repo.oauth2_session()
131                .finish(&clock, edge)
132                .await
133                .map_err(JobError::retry)?;
134        }
135
136        repo.save().await.map_err(JobError::retry)?;
137
138        Ok(())
139    }
140}
141
142#[async_trait]
143impl RunnableJob for ExpireInactiveCompatSessionsJob {
144    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
145        let mut repo = state.repository().await.map_err(JobError::retry)?;
146        let clock = state.clock();
147        let mut rng = state.rng();
148        let mut users_synced = HashSet::new();
149
150        // This delay is used to space out the device sync jobs
151        // We add 10 seconds between each device sync, meaning that it will spread out
152        // the syncs over ~16 minutes max if we get a full batch of 100 users
153        let mut delay = Duration::minutes(1);
154
155        let filter = CompatSessionFilter::new()
156            .with_last_active_before(self.threshold())
157            .active_only();
158
159        let pagination = self.pagination(100);
160
161        let page = repo
162            .compat_session()
163            .list(filter, pagination)
164            .await
165            .map_err(JobError::retry)?
166            .map(|(c, _)| c);
167
168        if let Some(job) = self.next(&page) {
169            tracing::info!("Scheduling job to expire the next batch of inactive sessions");
170            repo.queue_job()
171                .schedule_job(&mut rng, &clock, job)
172                .await
173                .map_err(JobError::retry)?;
174        }
175
176        for edge in page.edges {
177            let inserted = users_synced.insert(edge.user_id);
178            if inserted {
179                tracing::info!(user.id = %edge.user_id, "Scheduling devices sync for user");
180                repo.queue_job()
181                    .schedule_job_later(
182                        &mut rng,
183                        &clock,
184                        SyncDevicesJob::new_for_id(edge.user_id),
185                        clock.now() + delay,
186                    )
187                    .await
188                    .map_err(JobError::retry)?;
189                delay += Duration::seconds(10);
190            }
191
192            repo.compat_session()
193                .finish(&clock, edge)
194                .await
195                .map_err(JobError::retry)?;
196        }
197
198        repo.save().await.map_err(JobError::retry)?;
199
200        Ok(())
201    }
202}
203
204#[async_trait]
205impl RunnableJob for ExpireInactiveUserSessionsJob {
206    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
207        let mut repo = state.repository().await.map_err(JobError::retry)?;
208        let clock = state.clock();
209        let mut rng = state.rng();
210
211        let filter = BrowserSessionFilter::new()
212            .with_last_active_before(self.threshold())
213            .active_only();
214
215        let pagination = self.pagination(100);
216
217        let page = repo
218            .browser_session()
219            .list(filter, pagination)
220            .await
221            .map_err(JobError::retry)?;
222
223        if let Some(job) = self.next(&page) {
224            tracing::info!("Scheduling job to expire the next batch of inactive sessions");
225            repo.queue_job()
226                .schedule_job(&mut rng, &clock, job)
227                .await
228                .map_err(JobError::retry)?;
229        }
230
231        for edge in page.edges {
232            repo.browser_session()
233                .finish(&clock, edge)
234                .await
235                .map_err(JobError::retry)?;
236        }
237
238        repo.save().await.map_err(JobError::retry)?;
239
240        Ok(())
241    }
242}