1use std::collections::HashSet;
7
8use async_trait::async_trait;
9use chrono::Duration;
10use mas_storage::{
11 compat::CompatSessionFilter,
12 oauth2::OAuth2SessionFilter,
13 queue::{
14 ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, ExpireInactiveSessionsJob,
15 ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob,
16 },
17 user::BrowserSessionFilter,
18};
19
20use crate::{
21 State,
22 new_queue::{JobContext, JobError, RunnableJob},
23};
24
25#[async_trait]
26impl RunnableJob for ExpireInactiveSessionsJob {
27 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
28 let Some(config) = state.site_config().session_expiration.as_ref() else {
29 return Ok(());
31 };
32
33 let clock = state.clock();
34 let mut rng = state.rng();
35 let now = clock.now();
36 let mut repo = state.repository().await.map_err(JobError::retry)?;
37
38 if let Some(ttl) = config.oauth_session_inactivity_ttl {
39 repo.queue_job()
40 .schedule_job(
41 &mut rng,
42 &clock,
43 ExpireInactiveOAuthSessionsJob::new(now - ttl),
44 )
45 .await
46 .map_err(JobError::retry)?;
47 }
48
49 if let Some(ttl) = config.compat_session_inactivity_ttl {
50 repo.queue_job()
51 .schedule_job(
52 &mut rng,
53 &clock,
54 ExpireInactiveCompatSessionsJob::new(now - ttl),
55 )
56 .await
57 .map_err(JobError::retry)?;
58 }
59
60 if let Some(ttl) = config.user_session_inactivity_ttl {
61 repo.queue_job()
62 .schedule_job(
63 &mut rng,
64 &clock,
65 ExpireInactiveUserSessionsJob::new(now - ttl),
66 )
67 .await
68 .map_err(JobError::retry)?;
69 }
70
71 repo.save().await.map_err(JobError::retry)?;
72
73 Ok(())
74 }
75}
76
77#[async_trait]
78impl RunnableJob for ExpireInactiveOAuthSessionsJob {
79 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
80 let mut repo = state.repository().await.map_err(JobError::retry)?;
81 let clock = state.clock();
82 let mut rng = state.rng();
83 let mut users_synced = HashSet::new();
84
85 let mut delay = Duration::minutes(1);
89
90 let filter = OAuth2SessionFilter::new()
91 .with_last_active_before(self.threshold())
92 .for_any_user()
93 .only_dynamic_clients()
94 .active_only();
95
96 let pagination = self.pagination(100);
97
98 let page = repo
99 .oauth2_session()
100 .list(filter, pagination)
101 .await
102 .map_err(JobError::retry)?;
103
104 if let Some(job) = self.next(&page) {
105 tracing::info!("Scheduling job to expire the next batch of inactive sessions");
106 repo.queue_job()
107 .schedule_job(&mut rng, &clock, job)
108 .await
109 .map_err(JobError::retry)?;
110 }
111
112 for edge in page.edges {
113 if let Some(user_id) = edge.user_id {
114 let inserted = users_synced.insert(user_id);
115 if inserted {
116 tracing::info!(user.id = %user_id, "Scheduling devices sync for user");
117 repo.queue_job()
118 .schedule_job_later(
119 &mut rng,
120 &clock,
121 SyncDevicesJob::new_for_id(user_id),
122 clock.now() + delay,
123 )
124 .await
125 .map_err(JobError::retry)?;
126 delay += Duration::seconds(10);
127 }
128 }
129
130 repo.oauth2_session()
131 .finish(&clock, edge)
132 .await
133 .map_err(JobError::retry)?;
134 }
135
136 repo.save().await.map_err(JobError::retry)?;
137
138 Ok(())
139 }
140}
141
142#[async_trait]
143impl RunnableJob for ExpireInactiveCompatSessionsJob {
144 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
145 let mut repo = state.repository().await.map_err(JobError::retry)?;
146 let clock = state.clock();
147 let mut rng = state.rng();
148 let mut users_synced = HashSet::new();
149
150 let mut delay = Duration::minutes(1);
154
155 let filter = CompatSessionFilter::new()
156 .with_last_active_before(self.threshold())
157 .active_only();
158
159 let pagination = self.pagination(100);
160
161 let page = repo
162 .compat_session()
163 .list(filter, pagination)
164 .await
165 .map_err(JobError::retry)?
166 .map(|(c, _)| c);
167
168 if let Some(job) = self.next(&page) {
169 tracing::info!("Scheduling job to expire the next batch of inactive sessions");
170 repo.queue_job()
171 .schedule_job(&mut rng, &clock, job)
172 .await
173 .map_err(JobError::retry)?;
174 }
175
176 for edge in page.edges {
177 let inserted = users_synced.insert(edge.user_id);
178 if inserted {
179 tracing::info!(user.id = %edge.user_id, "Scheduling devices sync for user");
180 repo.queue_job()
181 .schedule_job_later(
182 &mut rng,
183 &clock,
184 SyncDevicesJob::new_for_id(edge.user_id),
185 clock.now() + delay,
186 )
187 .await
188 .map_err(JobError::retry)?;
189 delay += Duration::seconds(10);
190 }
191
192 repo.compat_session()
193 .finish(&clock, edge)
194 .await
195 .map_err(JobError::retry)?;
196 }
197
198 repo.save().await.map_err(JobError::retry)?;
199
200 Ok(())
201 }
202}
203
204#[async_trait]
205impl RunnableJob for ExpireInactiveUserSessionsJob {
206 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
207 let mut repo = state.repository().await.map_err(JobError::retry)?;
208 let clock = state.clock();
209 let mut rng = state.rng();
210
211 let filter = BrowserSessionFilter::new()
212 .with_last_active_before(self.threshold())
213 .active_only();
214
215 let pagination = self.pagination(100);
216
217 let page = repo
218 .browser_session()
219 .list(filter, pagination)
220 .await
221 .map_err(JobError::retry)?;
222
223 if let Some(job) = self.next(&page) {
224 tracing::info!("Scheduling job to expire the next batch of inactive sessions");
225 repo.queue_job()
226 .schedule_job(&mut rng, &clock, job)
227 .await
228 .map_err(JobError::retry)?;
229 }
230
231 for edge in page.edges {
232 repo.browser_session()
233 .finish(&clock, edge)
234 .await
235 .map_err(JobError::retry)?;
236 }
237
238 repo.save().await.map_err(JobError::retry)?;
239
240 Ok(())
241 }
242}