mas_storage/queue/
tasks.rs

1// Copyright 2024, 2025 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6use chrono::{DateTime, Utc};
7use mas_data_model::{
8    BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication,
9    UserRecoverySession,
10};
11use serde::{Deserialize, Serialize};
12use ulid::Ulid;
13
14use super::InsertableJob;
15use crate::{Page, Pagination};
16
17/// This is the previous iteration of the email verification job. It has been
18/// replaced by [`SendEmailAuthenticationCodeJob`]. This struct is kept to be
19/// able to consume jobs that are still in the queue.
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub struct VerifyEmailJob {
22    user_email_id: Ulid,
23    language: Option<String>,
24}
25
26impl VerifyEmailJob {
27    /// The ID of the email address to verify.
28    #[must_use]
29    pub fn user_email_id(&self) -> Ulid {
30        self.user_email_id
31    }
32}
33
34impl InsertableJob for VerifyEmailJob {
35    const QUEUE_NAME: &'static str = "verify-email";
36}
37
38/// A job to send an email authentication code to a user.
39#[derive(Serialize, Deserialize, Debug, Clone)]
40pub struct SendEmailAuthenticationCodeJob {
41    user_email_authentication_id: Ulid,
42    language: String,
43}
44
45impl SendEmailAuthenticationCodeJob {
46    /// Create a new job to send an email authentication code to a user.
47    #[must_use]
48    pub fn new(user_email_authentication: &UserEmailAuthentication, language: String) -> Self {
49        Self {
50            user_email_authentication_id: user_email_authentication.id,
51            language,
52        }
53    }
54
55    /// The language to use for the email.
56    #[must_use]
57    pub fn language(&self) -> &str {
58        &self.language
59    }
60
61    /// The ID of the email authentication to send the code for.
62    #[must_use]
63    pub fn user_email_authentication_id(&self) -> Ulid {
64        self.user_email_authentication_id
65    }
66}
67
68impl InsertableJob for SendEmailAuthenticationCodeJob {
69    const QUEUE_NAME: &'static str = "send-email-authentication-code";
70}
71
72/// A job to provision the user on the homeserver.
73#[derive(Serialize, Deserialize, Debug, Clone)]
74pub struct ProvisionUserJob {
75    user_id: Ulid,
76    set_display_name: Option<String>,
77}
78
79impl ProvisionUserJob {
80    /// Create a new job to provision the user on the homeserver.
81    #[must_use]
82    pub fn new(user: &User) -> Self {
83        Self {
84            user_id: user.id,
85            set_display_name: None,
86        }
87    }
88
89    #[doc(hidden)]
90    #[must_use]
91    pub fn new_for_id(user_id: Ulid) -> Self {
92        Self {
93            user_id,
94            set_display_name: None,
95        }
96    }
97
98    /// Set the display name of the user.
99    #[must_use]
100    pub fn set_display_name(mut self, display_name: String) -> Self {
101        self.set_display_name = Some(display_name);
102        self
103    }
104
105    /// Get the display name to be set.
106    #[must_use]
107    pub fn display_name_to_set(&self) -> Option<&str> {
108        self.set_display_name.as_deref()
109    }
110
111    /// The ID of the user to provision.
112    #[must_use]
113    pub fn user_id(&self) -> Ulid {
114        self.user_id
115    }
116}
117
118impl InsertableJob for ProvisionUserJob {
119    const QUEUE_NAME: &'static str = "provision-user";
120}
121
122/// A job to provision a device for a user on the homeserver.
123///
124/// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to
125/// not break existing jobs in the database.
126#[derive(Serialize, Deserialize, Debug, Clone)]
127pub struct ProvisionDeviceJob {
128    user_id: Ulid,
129    device_id: String,
130}
131
132impl ProvisionDeviceJob {
133    /// The ID of the user to provision the device for.
134    #[must_use]
135    pub fn user_id(&self) -> Ulid {
136        self.user_id
137    }
138
139    /// The ID of the device to provision.
140    #[must_use]
141    pub fn device_id(&self) -> &str {
142        &self.device_id
143    }
144}
145
146impl InsertableJob for ProvisionDeviceJob {
147    const QUEUE_NAME: &'static str = "provision-device";
148}
149
150/// A job to delete a device for a user on the homeserver.
151///
152/// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to
153/// not break existing jobs in the database.
154#[derive(Serialize, Deserialize, Debug, Clone)]
155pub struct DeleteDeviceJob {
156    user_id: Ulid,
157    device_id: String,
158}
159
160impl DeleteDeviceJob {
161    /// Create a new job to delete a device for a user on the homeserver.
162    #[must_use]
163    pub fn new(user: &User, device: &Device) -> Self {
164        Self {
165            user_id: user.id,
166            device_id: device.as_str().to_owned(),
167        }
168    }
169
170    /// The ID of the user to delete the device for.
171    #[must_use]
172    pub fn user_id(&self) -> Ulid {
173        self.user_id
174    }
175
176    /// The ID of the device to delete.
177    #[must_use]
178    pub fn device_id(&self) -> &str {
179        &self.device_id
180    }
181}
182
183impl InsertableJob for DeleteDeviceJob {
184    const QUEUE_NAME: &'static str = "delete-device";
185}
186
187/// A job which syncs the list of devices of a user with the homeserver
188#[derive(Serialize, Deserialize, Debug, Clone)]
189pub struct SyncDevicesJob {
190    user_id: Ulid,
191}
192
193impl SyncDevicesJob {
194    /// Create a new job to sync the list of devices of a user with the
195    /// homeserver
196    #[must_use]
197    pub fn new(user: &User) -> Self {
198        Self { user_id: user.id }
199    }
200
201    /// Create a new job to sync the list of devices of a user with the
202    /// homeserver for the given user ID
203    ///
204    /// This is useful to use in cases where the [`User`] object isn't loaded
205    #[must_use]
206    pub fn new_for_id(user_id: Ulid) -> Self {
207        Self { user_id }
208    }
209
210    /// The ID of the user to sync the devices for
211    #[must_use]
212    pub fn user_id(&self) -> Ulid {
213        self.user_id
214    }
215}
216
217impl InsertableJob for SyncDevicesJob {
218    const QUEUE_NAME: &'static str = "sync-devices";
219}
220
221/// A job to deactivate and lock a user
222#[derive(Serialize, Deserialize, Debug, Clone)]
223pub struct DeactivateUserJob {
224    user_id: Ulid,
225    hs_erase: bool,
226}
227
228impl DeactivateUserJob {
229    /// Create a new job to deactivate and lock a user
230    ///
231    /// # Parameters
232    ///
233    /// * `user` - The user to deactivate
234    /// * `hs_erase` - Whether to erase the user from the homeserver
235    #[must_use]
236    pub fn new(user: &User, hs_erase: bool) -> Self {
237        Self {
238            user_id: user.id,
239            hs_erase,
240        }
241    }
242
243    /// The ID of the user to deactivate
244    #[must_use]
245    pub fn user_id(&self) -> Ulid {
246        self.user_id
247    }
248
249    /// Whether to erase the user from the homeserver
250    #[must_use]
251    pub fn hs_erase(&self) -> bool {
252        self.hs_erase
253    }
254}
255
256impl InsertableJob for DeactivateUserJob {
257    const QUEUE_NAME: &'static str = "deactivate-user";
258}
259
260/// A job to reactivate a user
261#[derive(Serialize, Deserialize, Debug, Clone)]
262pub struct ReactivateUserJob {
263    user_id: Ulid,
264}
265
266impl ReactivateUserJob {
267    /// Create a new job to reactivate a user
268    ///
269    /// # Parameters
270    ///
271    /// * `user` - The user to reactivate
272    #[must_use]
273    pub fn new(user: &User) -> Self {
274        Self { user_id: user.id }
275    }
276
277    /// The ID of the user to reactivate
278    #[must_use]
279    pub fn user_id(&self) -> Ulid {
280        self.user_id
281    }
282}
283
284impl InsertableJob for ReactivateUserJob {
285    const QUEUE_NAME: &'static str = "reactivate-user";
286}
287
288/// Send account recovery emails
289#[derive(Serialize, Deserialize, Debug, Clone)]
290pub struct SendAccountRecoveryEmailsJob {
291    user_recovery_session_id: Ulid,
292}
293
294impl SendAccountRecoveryEmailsJob {
295    /// Create a new job to send account recovery emails
296    ///
297    /// # Parameters
298    ///
299    /// * `user_recovery_session` - The user recovery session to send the email
300    ///   for
301    /// * `language` - The locale to send the email in
302    #[must_use]
303    pub fn new(user_recovery_session: &UserRecoverySession) -> Self {
304        Self {
305            user_recovery_session_id: user_recovery_session.id,
306        }
307    }
308
309    /// The ID of the user recovery session to send the email for
310    #[must_use]
311    pub fn user_recovery_session_id(&self) -> Ulid {
312        self.user_recovery_session_id
313    }
314}
315
316impl InsertableJob for SendAccountRecoveryEmailsJob {
317    const QUEUE_NAME: &'static str = "send-account-recovery-email";
318}
319
320/// Cleanup expired tokens
321#[derive(Serialize, Deserialize, Debug, Clone, Default)]
322pub struct CleanupExpiredTokensJob;
323
324impl InsertableJob for CleanupExpiredTokensJob {
325    const QUEUE_NAME: &'static str = "cleanup-expired-tokens";
326}
327
328/// Scheduled job to expire inactive sessions
329///
330/// This job will trigger jobs to expire inactive compat, oauth and user
331/// sessions.
332#[derive(Serialize, Deserialize, Debug, Clone)]
333pub struct ExpireInactiveSessionsJob;
334
335impl InsertableJob for ExpireInactiveSessionsJob {
336    const QUEUE_NAME: &'static str = "expire-inactive-sessions";
337}
338
339/// Expire inactive OAuth 2.0 sessions
340#[derive(Serialize, Deserialize, Debug, Clone)]
341pub struct ExpireInactiveOAuthSessionsJob {
342    threshold: DateTime<Utc>,
343    after: Option<Ulid>,
344}
345
346impl ExpireInactiveOAuthSessionsJob {
347    /// Create a new job to expire inactive OAuth 2.0 sessions
348    ///
349    /// # Parameters
350    ///
351    /// * `threshold` - The threshold to expire sessions at
352    #[must_use]
353    pub fn new(threshold: DateTime<Utc>) -> Self {
354        Self {
355            threshold,
356            after: None,
357        }
358    }
359
360    /// Get the threshold to expire sessions at
361    #[must_use]
362    pub fn threshold(&self) -> DateTime<Utc> {
363        self.threshold
364    }
365
366    /// Get the pagination cursor
367    #[must_use]
368    pub fn pagination(&self, batch_size: usize) -> Pagination {
369        let pagination = Pagination::first(batch_size);
370        if let Some(after) = self.after {
371            pagination.after(after)
372        } else {
373            pagination
374        }
375    }
376
377    /// Get the next job given the page returned by the database
378    #[must_use]
379    pub fn next(&self, page: &Page<Session>) -> Option<Self> {
380        if !page.has_next_page {
381            return None;
382        }
383
384        let last_edge = page.edges.last()?;
385        Some(Self {
386            threshold: self.threshold,
387            after: Some(last_edge.id),
388        })
389    }
390}
391
392impl InsertableJob for ExpireInactiveOAuthSessionsJob {
393    const QUEUE_NAME: &'static str = "expire-inactive-oauth-sessions";
394}
395
396/// Expire inactive compatibility sessions
397#[derive(Serialize, Deserialize, Debug, Clone)]
398pub struct ExpireInactiveCompatSessionsJob {
399    threshold: DateTime<Utc>,
400    after: Option<Ulid>,
401}
402
403impl ExpireInactiveCompatSessionsJob {
404    /// Create a new job to expire inactive compatibility sessions
405    ///
406    /// # Parameters
407    ///
408    /// * `threshold` - The threshold to expire sessions at
409    #[must_use]
410    pub fn new(threshold: DateTime<Utc>) -> Self {
411        Self {
412            threshold,
413            after: None,
414        }
415    }
416
417    /// Get the threshold to expire sessions at
418    #[must_use]
419    pub fn threshold(&self) -> DateTime<Utc> {
420        self.threshold
421    }
422
423    /// Get the pagination cursor
424    #[must_use]
425    pub fn pagination(&self, batch_size: usize) -> Pagination {
426        let pagination = Pagination::first(batch_size);
427        if let Some(after) = self.after {
428            pagination.after(after)
429        } else {
430            pagination
431        }
432    }
433
434    /// Get the next job given the page returned by the database
435    #[must_use]
436    pub fn next(&self, page: &Page<CompatSession>) -> Option<Self> {
437        if !page.has_next_page {
438            return None;
439        }
440
441        let last_edge = page.edges.last()?;
442        Some(Self {
443            threshold: self.threshold,
444            after: Some(last_edge.id),
445        })
446    }
447}
448
449impl InsertableJob for ExpireInactiveCompatSessionsJob {
450    const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions";
451}
452
453/// Expire inactive user sessions
454#[derive(Debug, Serialize, Deserialize)]
455pub struct ExpireInactiveUserSessionsJob {
456    threshold: DateTime<Utc>,
457    after: Option<Ulid>,
458}
459
460impl ExpireInactiveUserSessionsJob {
461    /// Create a new job to expire inactive user/browser sessions
462    ///
463    /// # Parameters
464    ///
465    /// * `threshold` - The threshold to expire sessions at
466    #[must_use]
467    pub fn new(threshold: DateTime<Utc>) -> Self {
468        Self {
469            threshold,
470            after: None,
471        }
472    }
473
474    /// Get the threshold to expire sessions at
475    #[must_use]
476    pub fn threshold(&self) -> DateTime<Utc> {
477        self.threshold
478    }
479
480    /// Get the pagination cursor
481    #[must_use]
482    pub fn pagination(&self, batch_size: usize) -> Pagination {
483        let pagination = Pagination::first(batch_size);
484        if let Some(after) = self.after {
485            pagination.after(after)
486        } else {
487            pagination
488        }
489    }
490
491    /// Get the next job given the page returned by the database
492    #[must_use]
493    pub fn next(&self, page: &Page<BrowserSession>) -> Option<Self> {
494        if !page.has_next_page {
495            return None;
496        }
497
498        let last_edge = page.edges.last()?;
499        Some(Self {
500            threshold: self.threshold,
501            after: Some(last_edge.id),
502        })
503    }
504}
505
506impl InsertableJob for ExpireInactiveUserSessionsJob {
507    const QUEUE_NAME: &'static str = "expire-inactive-user-sessions";
508}
509
510/// Prune stale policy data
511#[derive(Debug, Serialize, Deserialize)]
512pub struct PruneStalePolicyDataJob;
513
514impl InsertableJob for PruneStalePolicyDataJob {
515    const QUEUE_NAME: &'static str = "prune-stale-policy-data";
516}