1use anyhow::Context;
8use async_trait::async_trait;
9use mas_storage::{
10 RepositoryAccess,
11 compat::CompatSessionFilter,
12 oauth2::OAuth2SessionFilter,
13 queue::{DeactivateUserJob, ReactivateUserJob},
14 user::{BrowserSessionFilter, UserEmailFilter, UserRepository},
15};
16use tracing::info;
17
18use crate::{
19 State,
20 new_queue::{JobContext, JobError, RunnableJob},
21};
22
23#[async_trait]
25impl RunnableJob for DeactivateUserJob {
26 #[tracing::instrument(
27 name = "job.deactivate_user"
28 fields(user.id = %self.user_id(), erase = %self.hs_erase()),
29 skip_all,
30 err,
31 )]
32 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
33 let clock = state.clock();
34 let matrix = state.matrix_connection();
35 let mut repo = state.repository().await.map_err(JobError::retry)?;
36
37 let user = repo
38 .user()
39 .lookup(self.user_id())
40 .await
41 .map_err(JobError::retry)?
42 .context("User not found")
43 .map_err(JobError::fail)?;
44
45 let user = repo
47 .user()
48 .lock(&clock, user)
49 .await
50 .context("Failed to lock user")
51 .map_err(JobError::retry)?;
52
53 let user = repo
54 .user()
55 .deactivate(&clock, user)
56 .await
57 .context("Failed to deactivate user")
58 .map_err(JobError::retry)?;
59
60 let n = repo
62 .browser_session()
63 .finish_bulk(
64 &clock,
65 BrowserSessionFilter::new().for_user(&user).active_only(),
66 )
67 .await
68 .map_err(JobError::retry)?;
69 info!(affected = n, "Killed all browser sessions for user");
70
71 let n = repo
72 .oauth2_session()
73 .finish_bulk(
74 &clock,
75 OAuth2SessionFilter::new().for_user(&user).active_only(),
76 )
77 .await
78 .map_err(JobError::retry)?;
79 info!(affected = n, "Killed all OAuth 2.0 sessions for user");
80
81 let n = repo
82 .compat_session()
83 .finish_bulk(
84 &clock,
85 CompatSessionFilter::new().for_user(&user).active_only(),
86 )
87 .await
88 .map_err(JobError::retry)?;
89 info!(affected = n, "Killed all compatibility sessions for user");
90
91 let n = repo
93 .user_email()
94 .remove_bulk(UserEmailFilter::new().for_user(&user))
95 .await
96 .map_err(JobError::retry)?;
97 info!(affected = n, "Removed all email addresses for user");
98
99 repo.save().await.map_err(JobError::retry)?;
102
103 let mxid = matrix.mxid(&user.username);
104 info!("Deactivating user {} on homeserver", mxid);
105 matrix
106 .delete_user(&mxid, self.hs_erase())
107 .await
108 .map_err(JobError::retry)?;
109
110 Ok(())
111 }
112}
113
114#[async_trait]
116impl RunnableJob for ReactivateUserJob {
117 #[tracing::instrument(
118 name = "job.reactivate_user",
119 fields(user.id = %self.user_id()),
120 skip_all,
121 err,
122 )]
123 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
124 let matrix = state.matrix_connection();
125 let mut repo = state.repository().await.map_err(JobError::retry)?;
126
127 let user = repo
128 .user()
129 .lookup(self.user_id())
130 .await
131 .map_err(JobError::retry)?
132 .context("User not found")
133 .map_err(JobError::fail)?;
134
135 let mxid = matrix.mxid(&user.username);
136 info!("Reactivating user {} on homeserver", mxid);
137 matrix
138 .reactivate_user(&mxid)
139 .await
140 .map_err(JobError::retry)?;
141
142 let _user = repo.user().unlock(user).await.map_err(JobError::retry)?;
145 repo.save().await.map_err(JobError::retry)?;
146
147 Ok(())
148 }
149}