1use std::collections::HashSet;
8
9use anyhow::Context;
10use async_trait::async_trait;
11use mas_data_model::Device;
12use mas_matrix::ProvisionRequest;
13use mas_storage::{
14 Pagination, RepositoryAccess,
15 compat::CompatSessionFilter,
16 oauth2::OAuth2SessionFilter,
17 personal::PersonalSessionFilter,
18 queue::{
19 DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, QueueJobRepositoryExt as _,
20 SyncDevicesJob,
21 },
22 user::{UserEmailRepository, UserRepository},
23};
24use tracing::info;
25
26use crate::{
27 State,
28 new_queue::{JobContext, JobError, RunnableJob},
29};
30
31#[async_trait]
35impl RunnableJob for ProvisionUserJob {
36 #[tracing::instrument(
37 name = "job.provision_user"
38 fields(user.id = %self.user_id()),
39 skip_all,
40 )]
41 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
42 let matrix = state.matrix_connection();
43 let mut repo = state.repository().await.map_err(JobError::retry)?;
44 let mut rng = state.rng();
45 let clock = state.clock();
46
47 let user = repo
48 .user()
49 .lookup(self.user_id())
50 .await
51 .map_err(JobError::retry)?
52 .context("User not found")
53 .map_err(JobError::fail)?;
54
55 let emails = repo
56 .user_email()
57 .all(&user)
58 .await
59 .map_err(JobError::retry)?
60 .into_iter()
61 .map(|email| email.email)
62 .collect();
63 let mut request =
64 ProvisionRequest::new(user.username.clone(), user.sub.clone()).set_emails(emails);
65
66 if let Some(display_name) = self.display_name_to_set() {
67 request = request.set_displayname(display_name.to_owned());
68 }
69
70 let created = matrix
71 .provision_user(&request)
72 .await
73 .map_err(JobError::retry)?;
74
75 let mxid = matrix.mxid(&user.username);
76 if created {
77 info!(%user.id, %mxid, "User created");
78 } else {
79 info!(%user.id, %mxid, "User updated");
80 }
81
82 let sync_device_job = SyncDevicesJob::new(&user);
84 repo.queue_job()
85 .schedule_job(&mut rng, clock, sync_device_job)
86 .await
87 .map_err(JobError::retry)?;
88
89 repo.save().await.map_err(JobError::retry)?;
90
91 Ok(())
92 }
93}
94
95#[async_trait]
99impl RunnableJob for ProvisionDeviceJob {
100 #[tracing::instrument(
101 name = "job.provision_device"
102 fields(
103 user.id = %self.user_id(),
104 device.id = %self.device_id(),
105 ),
106 skip_all,
107 )]
108 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
109 let mut repo = state.repository().await.map_err(JobError::retry)?;
110 let mut rng = state.rng();
111 let clock = state.clock();
112
113 let user = repo
114 .user()
115 .lookup(self.user_id())
116 .await
117 .map_err(JobError::retry)?
118 .context("User not found")
119 .map_err(JobError::fail)?;
120
121 repo.queue_job()
123 .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
124 .await
125 .map_err(JobError::retry)?;
126
127 Ok(())
128 }
129}
130
131#[async_trait]
135impl RunnableJob for DeleteDeviceJob {
136 #[tracing::instrument(
137 name = "job.delete_device"
138 fields(
139 user.id = %self.user_id(),
140 device.id = %self.device_id(),
141 ),
142 skip_all,
143 )]
144 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
145 let mut rng = state.rng();
146 let clock = state.clock();
147 let mut repo = state.repository().await.map_err(JobError::retry)?;
148
149 let user = repo
150 .user()
151 .lookup(self.user_id())
152 .await
153 .map_err(JobError::retry)?
154 .context("User not found")
155 .map_err(JobError::fail)?;
156
157 repo.queue_job()
159 .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
160 .await
161 .map_err(JobError::retry)?;
162
163 Ok(())
164 }
165}
166
167#[async_trait]
169impl RunnableJob for SyncDevicesJob {
170 #[tracing::instrument(
171 name = "job.sync_devices",
172 fields(user.id = %self.user_id()),
173 skip_all,
174 )]
175 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
176 let matrix = state.matrix_connection();
177 let mut repo = state.repository().await.map_err(JobError::retry)?;
178
179 let user = repo
180 .user()
181 .lookup(self.user_id())
182 .await
183 .map_err(JobError::retry)?
184 .context("User not found")
185 .map_err(JobError::fail)?;
186
187 repo.user()
189 .acquire_lock_for_sync(&user)
190 .await
191 .map_err(JobError::retry)?;
192
193 let mut devices = HashSet::new();
194
195 let mut cursor = Pagination::first(5000);
197 loop {
198 let page = repo
199 .compat_session()
200 .list(
201 CompatSessionFilter::new().for_user(&user).active_only(),
202 cursor,
203 )
204 .await
205 .map_err(JobError::retry)?;
206
207 for edge in page.edges {
208 let (compat_session, _) = edge.node;
209 if let Some(ref device) = compat_session.device {
210 devices.insert(device.as_str().to_owned());
211 }
212 cursor = cursor.after(edge.cursor);
213 }
214
215 if !page.has_next_page {
216 break;
217 }
218 }
219
220 let mut cursor = Pagination::first(5000);
222 loop {
223 let page = repo
224 .oauth2_session()
225 .list(
226 OAuth2SessionFilter::new().for_user(&user).active_only(),
227 cursor,
228 )
229 .await
230 .map_err(JobError::retry)?;
231
232 for edge in page.edges {
233 for scope in &*edge.node.scope {
234 if let Some(device) = Device::from_scope_token(scope) {
235 devices.insert(device.as_str().to_owned());
236 }
237 }
238
239 cursor = cursor.after(edge.cursor);
240 }
241
242 if !page.has_next_page {
243 break;
244 }
245 }
246
247 let mut cursor = Pagination::first(5000);
249 loop {
250 let page = repo
251 .personal_session()
252 .list(
253 PersonalSessionFilter::new()
254 .for_actor_user(&user)
255 .active_only(),
256 cursor,
257 )
258 .await
259 .map_err(JobError::retry)?;
260
261 for edge in page.edges {
262 let (session, _) = &edge.node;
263 for scope in &*session.scope {
264 if let Some(device) = Device::from_scope_token(scope) {
265 devices.insert(device.as_str().to_owned());
266 }
267 }
268
269 cursor = cursor.after(edge.cursor);
270 }
271
272 if !page.has_next_page {
273 break;
274 }
275 }
276
277 matrix
278 .sync_devices(&user.username, devices)
279 .await
280 .map_err(JobError::retry)?;
281
282 repo.save().await.map_err(JobError::retry)?;
285
286 Ok(())
287 }
288}