mas_tasks/
matrix.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7use std::collections::HashSet;
8
9use anyhow::Context;
10use async_trait::async_trait;
11use mas_data_model::Device;
12use mas_matrix::ProvisionRequest;
13use mas_storage::{
14    Pagination, RepositoryAccess,
15    compat::CompatSessionFilter,
16    oauth2::OAuth2SessionFilter,
17    personal::PersonalSessionFilter,
18    queue::{
19        DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, QueueJobRepositoryExt as _,
20        SyncDevicesJob,
21    },
22    user::{UserEmailRepository, UserRepository},
23};
24use tracing::info;
25
26use crate::{
27    State,
28    new_queue::{JobContext, JobError, RunnableJob},
29};
30
31/// Job to provision a user on the Matrix homeserver.
32/// This works by doing a PUT request to the
33/// `/_synapse/admin/v2/users/{user_id}` endpoint.
34#[async_trait]
35impl RunnableJob for ProvisionUserJob {
36    #[tracing::instrument(
37        name = "job.provision_user"
38        fields(user.id = %self.user_id()),
39        skip_all,
40    )]
41    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
42        let matrix = state.matrix_connection();
43        let mut repo = state.repository().await.map_err(JobError::retry)?;
44        let mut rng = state.rng();
45        let clock = state.clock();
46
47        let user = repo
48            .user()
49            .lookup(self.user_id())
50            .await
51            .map_err(JobError::retry)?
52            .context("User not found")
53            .map_err(JobError::fail)?;
54
55        let emails = repo
56            .user_email()
57            .all(&user)
58            .await
59            .map_err(JobError::retry)?
60            .into_iter()
61            .map(|email| email.email)
62            .collect();
63        let mut request =
64            ProvisionRequest::new(user.username.clone(), user.sub.clone()).set_emails(emails);
65
66        if let Some(display_name) = self.display_name_to_set() {
67            request = request.set_displayname(display_name.to_owned());
68        }
69
70        let created = matrix
71            .provision_user(&request)
72            .await
73            .map_err(JobError::retry)?;
74
75        let mxid = matrix.mxid(&user.username);
76        if created {
77            info!(%user.id, %mxid, "User created");
78        } else {
79            info!(%user.id, %mxid, "User updated");
80        }
81
82        // Schedule a device sync job
83        let sync_device_job = SyncDevicesJob::new(&user);
84        repo.queue_job()
85            .schedule_job(&mut rng, clock, sync_device_job)
86            .await
87            .map_err(JobError::retry)?;
88
89        repo.save().await.map_err(JobError::retry)?;
90
91        Ok(())
92    }
93}
94
95/// Job to provision a device on the Matrix homeserver.
96///
97/// This job is deprecated and therefore just schedules a [`SyncDevicesJob`]
98#[async_trait]
99impl RunnableJob for ProvisionDeviceJob {
100    #[tracing::instrument(
101        name = "job.provision_device"
102        fields(
103            user.id = %self.user_id(),
104            device.id = %self.device_id(),
105        ),
106        skip_all,
107    )]
108    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
109        let mut repo = state.repository().await.map_err(JobError::retry)?;
110        let mut rng = state.rng();
111        let clock = state.clock();
112
113        let user = repo
114            .user()
115            .lookup(self.user_id())
116            .await
117            .map_err(JobError::retry)?
118            .context("User not found")
119            .map_err(JobError::fail)?;
120
121        // Schedule a device sync job
122        repo.queue_job()
123            .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
124            .await
125            .map_err(JobError::retry)?;
126
127        Ok(())
128    }
129}
130
131/// Job to delete a device from a user's account.
132///
133/// This job is deprecated and therefore just schedules a [`SyncDevicesJob`]
134#[async_trait]
135impl RunnableJob for DeleteDeviceJob {
136    #[tracing::instrument(
137        name = "job.delete_device"
138        fields(
139            user.id = %self.user_id(),
140            device.id = %self.device_id(),
141        ),
142        skip_all,
143    )]
144    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
145        let mut rng = state.rng();
146        let clock = state.clock();
147        let mut repo = state.repository().await.map_err(JobError::retry)?;
148
149        let user = repo
150            .user()
151            .lookup(self.user_id())
152            .await
153            .map_err(JobError::retry)?
154            .context("User not found")
155            .map_err(JobError::fail)?;
156
157        // Schedule a device sync job
158        repo.queue_job()
159            .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
160            .await
161            .map_err(JobError::retry)?;
162
163        Ok(())
164    }
165}
166
167/// Job to sync the list of devices of a user with the homeserver.
168#[async_trait]
169impl RunnableJob for SyncDevicesJob {
170    #[tracing::instrument(
171        name = "job.sync_devices",
172        fields(user.id = %self.user_id()),
173        skip_all,
174    )]
175    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
176        let matrix = state.matrix_connection();
177        let mut repo = state.repository().await.map_err(JobError::retry)?;
178
179        let user = repo
180            .user()
181            .lookup(self.user_id())
182            .await
183            .map_err(JobError::retry)?
184            .context("User not found")
185            .map_err(JobError::fail)?;
186
187        // Lock the user sync to make sure we don't get into a race condition
188        repo.user()
189            .acquire_lock_for_sync(&user)
190            .await
191            .map_err(JobError::retry)?;
192
193        let mut devices = HashSet::new();
194
195        // Cycle through all the compat sessions of the user, and grab the devices
196        let mut cursor = Pagination::first(5000);
197        loop {
198            let page = repo
199                .compat_session()
200                .list(
201                    CompatSessionFilter::new().for_user(&user).active_only(),
202                    cursor,
203                )
204                .await
205                .map_err(JobError::retry)?;
206
207            for edge in page.edges {
208                let (compat_session, _) = edge.node;
209                if let Some(ref device) = compat_session.device {
210                    devices.insert(device.as_str().to_owned());
211                }
212                cursor = cursor.after(edge.cursor);
213            }
214
215            if !page.has_next_page {
216                break;
217            }
218        }
219
220        // Cycle though all the oauth2 sessions of the user, and grab the devices
221        let mut cursor = Pagination::first(5000);
222        loop {
223            let page = repo
224                .oauth2_session()
225                .list(
226                    OAuth2SessionFilter::new().for_user(&user).active_only(),
227                    cursor,
228                )
229                .await
230                .map_err(JobError::retry)?;
231
232            for edge in page.edges {
233                for scope in &*edge.node.scope {
234                    if let Some(device) = Device::from_scope_token(scope) {
235                        devices.insert(device.as_str().to_owned());
236                    }
237                }
238
239                cursor = cursor.after(edge.cursor);
240            }
241
242            if !page.has_next_page {
243                break;
244            }
245        }
246
247        // Cycle through all the personal sessions of the user and get the devices
248        let mut cursor = Pagination::first(5000);
249        loop {
250            let page = repo
251                .personal_session()
252                .list(
253                    PersonalSessionFilter::new()
254                        .for_actor_user(&user)
255                        .active_only(),
256                    cursor,
257                )
258                .await
259                .map_err(JobError::retry)?;
260
261            for edge in page.edges {
262                let (session, _) = &edge.node;
263                for scope in &*session.scope {
264                    if let Some(device) = Device::from_scope_token(scope) {
265                        devices.insert(device.as_str().to_owned());
266                    }
267                }
268
269                cursor = cursor.after(edge.cursor);
270            }
271
272            if !page.has_next_page {
273                break;
274            }
275        }
276
277        matrix
278            .sync_devices(&user.username, devices)
279            .await
280            .map_err(JobError::retry)?;
281
282        // We kept the connection until now, so that we still hold the lock on the user
283        // throughout the sync
284        repo.save().await.map_err(JobError::retry)?;
285
286        Ok(())
287    }
288}