1use std::collections::HashSet;
8
9use anyhow::Context;
10use async_trait::async_trait;
11use mas_data_model::Device;
12use mas_matrix::ProvisionRequest;
13use mas_storage::{
14 Pagination, RepositoryAccess,
15 compat::CompatSessionFilter,
16 oauth2::OAuth2SessionFilter,
17 personal::PersonalSessionFilter,
18 queue::{
19 DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, QueueJobRepositoryExt as _,
20 SyncDevicesJob,
21 },
22 user::{UserEmailRepository, UserRepository},
23};
24use tracing::info;
25
26use crate::{
27 State,
28 new_queue::{JobContext, JobError, RunnableJob},
29};
30
31#[async_trait]
35impl RunnableJob for ProvisionUserJob {
36 #[tracing::instrument(
37 name = "job.provision_user"
38 fields(user.id = %self.user_id()),
39 skip_all,
40 )]
41 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
42 let matrix = state.matrix_connection();
43 let mut repo = state.repository().await.map_err(JobError::retry)?;
44 let mut rng = state.rng();
45 let clock = state.clock();
46
47 let user = repo
48 .user()
49 .lookup(self.user_id())
50 .await
51 .map_err(JobError::retry)?
52 .context("User not found")
53 .map_err(JobError::fail)?;
54
55 let emails = repo
56 .user_email()
57 .all(&user)
58 .await
59 .map_err(JobError::retry)?
60 .into_iter()
61 .map(|email| email.email)
62 .collect();
63 let mut request = ProvisionRequest::new(
64 user.username.clone(),
65 user.sub.clone(),
66 user.locked_at.is_some(),
67 )
68 .set_emails(emails);
69
70 if let Some(display_name) = self.display_name_to_set() {
71 request = request.set_displayname(display_name.to_owned());
72 }
73
74 let created = matrix
75 .provision_user(&request)
76 .await
77 .map_err(JobError::retry)?;
78
79 let mxid = matrix.mxid(&user.username);
80 if created {
81 info!(%user.id, %mxid, "User created");
82 } else {
83 info!(%user.id, %mxid, "User updated");
84 }
85
86 let sync_device_job = SyncDevicesJob::new(&user);
88 repo.queue_job()
89 .schedule_job(&mut rng, clock, sync_device_job)
90 .await
91 .map_err(JobError::retry)?;
92
93 repo.save().await.map_err(JobError::retry)?;
94
95 Ok(())
96 }
97}
98
99#[async_trait]
103impl RunnableJob for ProvisionDeviceJob {
104 #[tracing::instrument(
105 name = "job.provision_device"
106 fields(
107 user.id = %self.user_id(),
108 device.id = %self.device_id(),
109 ),
110 skip_all,
111 )]
112 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
113 let mut repo = state.repository().await.map_err(JobError::retry)?;
114 let mut rng = state.rng();
115 let clock = state.clock();
116
117 let user = repo
118 .user()
119 .lookup(self.user_id())
120 .await
121 .map_err(JobError::retry)?
122 .context("User not found")
123 .map_err(JobError::fail)?;
124
125 repo.queue_job()
127 .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
128 .await
129 .map_err(JobError::retry)?;
130
131 Ok(())
132 }
133}
134
135#[async_trait]
139impl RunnableJob for DeleteDeviceJob {
140 #[tracing::instrument(
141 name = "job.delete_device"
142 fields(
143 user.id = %self.user_id(),
144 device.id = %self.device_id(),
145 ),
146 skip_all,
147 )]
148 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
149 let mut rng = state.rng();
150 let clock = state.clock();
151 let mut repo = state.repository().await.map_err(JobError::retry)?;
152
153 let user = repo
154 .user()
155 .lookup(self.user_id())
156 .await
157 .map_err(JobError::retry)?
158 .context("User not found")
159 .map_err(JobError::fail)?;
160
161 repo.queue_job()
163 .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
164 .await
165 .map_err(JobError::retry)?;
166
167 Ok(())
168 }
169}
170
171#[async_trait]
173impl RunnableJob for SyncDevicesJob {
174 #[tracing::instrument(
175 name = "job.sync_devices",
176 fields(user.id = %self.user_id()),
177 skip_all,
178 )]
179 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
180 let matrix = state.matrix_connection();
181 let mut repo = state.repository().await.map_err(JobError::retry)?;
182
183 let user = repo
184 .user()
185 .lookup(self.user_id())
186 .await
187 .map_err(JobError::retry)?
188 .context("User not found")
189 .map_err(JobError::fail)?;
190
191 repo.user()
193 .acquire_lock_for_sync(&user)
194 .await
195 .map_err(JobError::retry)?;
196
197 let mut devices = HashSet::new();
198
199 let mut cursor = Pagination::first(5000);
201 loop {
202 let page = repo
203 .compat_session()
204 .list(
205 CompatSessionFilter::new().for_user(&user).active_only(),
206 cursor,
207 )
208 .await
209 .map_err(JobError::retry)?;
210
211 for edge in page.edges {
212 let (compat_session, _) = edge.node;
213 if let Some(ref device) = compat_session.device {
214 devices.insert(device.as_str().to_owned());
215 }
216 cursor = cursor.after(edge.cursor);
217 }
218
219 if !page.has_next_page {
220 break;
221 }
222 }
223
224 let mut cursor = Pagination::first(5000);
226 loop {
227 let page = repo
228 .oauth2_session()
229 .list(
230 OAuth2SessionFilter::new().for_user(&user).active_only(),
231 cursor,
232 )
233 .await
234 .map_err(JobError::retry)?;
235
236 for edge in page.edges {
237 for scope in &*edge.node.scope {
238 if let Some(device) = Device::from_scope_token(scope) {
239 devices.insert(device.as_str().to_owned());
240 }
241 }
242
243 cursor = cursor.after(edge.cursor);
244 }
245
246 if !page.has_next_page {
247 break;
248 }
249 }
250
251 let mut cursor = Pagination::first(5000);
253 loop {
254 let page = repo
255 .personal_session()
256 .list(
257 PersonalSessionFilter::new()
258 .for_actor_user(&user)
259 .active_only(),
260 cursor,
261 )
262 .await
263 .map_err(JobError::retry)?;
264
265 for edge in page.edges {
266 let (session, _) = &edge.node;
267 for scope in &*session.scope {
268 if let Some(device) = Device::from_scope_token(scope) {
269 devices.insert(device.as_str().to_owned());
270 }
271 }
272
273 cursor = cursor.after(edge.cursor);
274 }
275
276 if !page.has_next_page {
277 break;
278 }
279 }
280
281 matrix
282 .sync_devices(&user.username, devices)
283 .await
284 .map_err(JobError::retry)?;
285
286 repo.save().await.map_err(JobError::retry)?;
289
290 Ok(())
291 }
292}