mas_tasks/
user.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use anyhow::Context;
8use async_trait::async_trait;
9use mas_storage::{
10    RepositoryAccess,
11    compat::CompatSessionFilter,
12    oauth2::OAuth2SessionFilter,
13    queue::{DeactivateUserJob, ReactivateUserJob},
14    user::{BrowserSessionFilter, UserEmailFilter, UserRepository},
15};
16use tracing::info;
17
18use crate::{
19    State,
20    new_queue::{JobContext, JobError, RunnableJob},
21};
22
23/// Job to deactivate a user, both locally and on the Matrix homeserver.
24#[async_trait]
25impl RunnableJob for DeactivateUserJob {
26    #[tracing::instrument(
27    name = "job.deactivate_user"
28        fields(user.id = %self.user_id(), erase = %self.hs_erase()),
29        skip_all,
30        err,
31    )]
32    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
33        let clock = state.clock();
34        let matrix = state.matrix_connection();
35        let mut repo = state.repository().await.map_err(JobError::retry)?;
36
37        let user = repo
38            .user()
39            .lookup(self.user_id())
40            .await
41            .map_err(JobError::retry)?
42            .context("User not found")
43            .map_err(JobError::fail)?;
44
45        // Let's first lock & deactivate the user
46        let user = repo
47            .user()
48            .lock(&clock, user)
49            .await
50            .context("Failed to lock user")
51            .map_err(JobError::retry)?;
52
53        let user = repo
54            .user()
55            .deactivate(&clock, user)
56            .await
57            .context("Failed to deactivate user")
58            .map_err(JobError::retry)?;
59
60        // Kill all sessions for the user
61        let n = repo
62            .browser_session()
63            .finish_bulk(
64                &clock,
65                BrowserSessionFilter::new().for_user(&user).active_only(),
66            )
67            .await
68            .map_err(JobError::retry)?;
69        info!(affected = n, "Killed all browser sessions for user");
70
71        let n = repo
72            .oauth2_session()
73            .finish_bulk(
74                &clock,
75                OAuth2SessionFilter::new().for_user(&user).active_only(),
76            )
77            .await
78            .map_err(JobError::retry)?;
79        info!(affected = n, "Killed all OAuth 2.0 sessions for user");
80
81        let n = repo
82            .compat_session()
83            .finish_bulk(
84                &clock,
85                CompatSessionFilter::new().for_user(&user).active_only(),
86            )
87            .await
88            .map_err(JobError::retry)?;
89        info!(affected = n, "Killed all compatibility sessions for user");
90
91        // Delete all the email addresses for the user
92        let n = repo
93            .user_email()
94            .remove_bulk(UserEmailFilter::new().for_user(&user))
95            .await
96            .map_err(JobError::retry)?;
97        info!(affected = n, "Removed all email addresses for user");
98
99        // Before calling back to the homeserver, commit the changes to the database, as
100        // we want the user to be locked out as soon as possible
101        repo.save().await.map_err(JobError::retry)?;
102
103        let mxid = matrix.mxid(&user.username);
104        info!("Deactivating user {} on homeserver", mxid);
105        matrix
106            .delete_user(&mxid, self.hs_erase())
107            .await
108            .map_err(JobError::retry)?;
109
110        Ok(())
111    }
112}
113
114/// Job to reactivate a user, both locally and on the Matrix homeserver.
115#[async_trait]
116impl RunnableJob for ReactivateUserJob {
117    #[tracing::instrument(
118        name = "job.reactivate_user",
119        fields(user.id = %self.user_id()),
120        skip_all,
121        err,
122    )]
123    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
124        let matrix = state.matrix_connection();
125        let mut repo = state.repository().await.map_err(JobError::retry)?;
126
127        let user = repo
128            .user()
129            .lookup(self.user_id())
130            .await
131            .map_err(JobError::retry)?
132            .context("User not found")
133            .map_err(JobError::fail)?;
134
135        let mxid = matrix.mxid(&user.username);
136        info!("Reactivating user {} on homeserver", mxid);
137        matrix
138            .reactivate_user(&mxid)
139            .await
140            .map_err(JobError::retry)?;
141
142        // We want to unlock the user from our side only once it has been reactivated on
143        // the homeserver
144        let _user = repo.user().unlock(user).await.map_err(JobError::retry)?;
145        repo.save().await.map_err(JobError::retry)?;
146
147        Ok(())
148    }
149}