1use std::collections::HashSet;
8
9use anyhow::Context;
10use async_trait::async_trait;
11use mas_data_model::Device;
12use mas_matrix::ProvisionRequest;
13use mas_storage::{
14 Pagination, RepositoryAccess,
15 compat::CompatSessionFilter,
16 oauth2::OAuth2SessionFilter,
17 queue::{
18 DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, QueueJobRepositoryExt as _,
19 SyncDevicesJob,
20 },
21 user::{UserEmailRepository, UserRepository},
22};
23use tracing::info;
24
25use crate::{
26 State,
27 new_queue::{JobContext, JobError, RunnableJob},
28};
29
30#[async_trait]
34impl RunnableJob for ProvisionUserJob {
35 #[tracing::instrument(
36 name = "job.provision_user"
37 fields(user.id = %self.user_id()),
38 skip_all,
39 err,
40 )]
41 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
42 let matrix = state.matrix_connection();
43 let mut repo = state.repository().await.map_err(JobError::retry)?;
44 let mut rng = state.rng();
45 let clock = state.clock();
46
47 let user = repo
48 .user()
49 .lookup(self.user_id())
50 .await
51 .map_err(JobError::retry)?
52 .context("User not found")
53 .map_err(JobError::fail)?;
54
55 let mxid = matrix.mxid(&user.username);
56 let emails = repo
57 .user_email()
58 .all(&user)
59 .await
60 .map_err(JobError::retry)?
61 .into_iter()
62 .map(|email| email.email)
63 .collect();
64 let mut request = ProvisionRequest::new(mxid.clone(), user.sub.clone()).set_emails(emails);
65
66 if let Some(display_name) = self.display_name_to_set() {
67 request = request.set_displayname(display_name.to_owned());
68 }
69
70 let created = matrix
71 .provision_user(&request)
72 .await
73 .map_err(JobError::retry)?;
74
75 if created {
76 info!(%user.id, %mxid, "User created");
77 } else {
78 info!(%user.id, %mxid, "User updated");
79 }
80
81 let sync_device_job = SyncDevicesJob::new(&user);
83 repo.queue_job()
84 .schedule_job(&mut rng, &clock, sync_device_job)
85 .await
86 .map_err(JobError::retry)?;
87
88 repo.save().await.map_err(JobError::retry)?;
89
90 Ok(())
91 }
92}
93
94#[async_trait]
98impl RunnableJob for ProvisionDeviceJob {
99 #[tracing::instrument(
100 name = "job.provision_device"
101 fields(
102 user.id = %self.user_id(),
103 device.id = %self.device_id(),
104 ),
105 skip_all,
106 err,
107 )]
108 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
109 let mut repo = state.repository().await.map_err(JobError::retry)?;
110 let mut rng = state.rng();
111 let clock = state.clock();
112
113 let user = repo
114 .user()
115 .lookup(self.user_id())
116 .await
117 .map_err(JobError::retry)?
118 .context("User not found")
119 .map_err(JobError::fail)?;
120
121 repo.queue_job()
123 .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
124 .await
125 .map_err(JobError::retry)?;
126
127 Ok(())
128 }
129}
130
131#[async_trait]
135impl RunnableJob for DeleteDeviceJob {
136 #[tracing::instrument(
137 name = "job.delete_device"
138 fields(
139 user.id = %self.user_id(),
140 device.id = %self.device_id(),
141 ),
142 skip_all,
143 err,
144 )]
145 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
146 let mut rng = state.rng();
147 let clock = state.clock();
148 let mut repo = state.repository().await.map_err(JobError::retry)?;
149
150 let user = repo
151 .user()
152 .lookup(self.user_id())
153 .await
154 .map_err(JobError::retry)?
155 .context("User not found")
156 .map_err(JobError::fail)?;
157
158 repo.queue_job()
160 .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
161 .await
162 .map_err(JobError::retry)?;
163
164 Ok(())
165 }
166}
167
168#[async_trait]
170impl RunnableJob for SyncDevicesJob {
171 #[tracing::instrument(
172 name = "job.sync_devices",
173 fields(user.id = %self.user_id()),
174 skip_all,
175 err,
176 )]
177 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
178 let matrix = state.matrix_connection();
179 let mut repo = state.repository().await.map_err(JobError::retry)?;
180
181 let user = repo
182 .user()
183 .lookup(self.user_id())
184 .await
185 .map_err(JobError::retry)?
186 .context("User not found")
187 .map_err(JobError::fail)?;
188
189 repo.user()
191 .acquire_lock_for_sync(&user)
192 .await
193 .map_err(JobError::retry)?;
194
195 let mut devices = HashSet::new();
196
197 let mut cursor = Pagination::first(100);
199 loop {
200 let page = repo
201 .compat_session()
202 .list(
203 CompatSessionFilter::new().for_user(&user).active_only(),
204 cursor,
205 )
206 .await
207 .map_err(JobError::retry)?;
208
209 for (compat_session, _) in page.edges {
210 if let Some(ref device) = compat_session.device {
211 devices.insert(device.as_str().to_owned());
212 }
213 cursor = cursor.after(compat_session.id);
214 }
215
216 if !page.has_next_page {
217 break;
218 }
219 }
220
221 let mut cursor = Pagination::first(100);
223 loop {
224 let page = repo
225 .oauth2_session()
226 .list(
227 OAuth2SessionFilter::new().for_user(&user).active_only(),
228 cursor,
229 )
230 .await
231 .map_err(JobError::retry)?;
232
233 for oauth2_session in page.edges {
234 for scope in &*oauth2_session.scope {
235 if let Some(device) = Device::from_scope_token(scope) {
236 devices.insert(device.as_str().to_owned());
237 }
238 }
239
240 cursor = cursor.after(oauth2_session.id);
241 }
242
243 if !page.has_next_page {
244 break;
245 }
246 }
247
248 let mxid = matrix.mxid(&user.username);
249 matrix
250 .sync_devices(&mxid, devices)
251 .await
252 .map_err(JobError::retry)?;
253
254 repo.save().await.map_err(JobError::retry)?;
257
258 Ok(())
259 }
260}