Skip to main content

mas_tasks/
matrix.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7use std::collections::HashSet;
8
9use anyhow::Context;
10use async_trait::async_trait;
11use mas_data_model::Device;
12use mas_matrix::ProvisionRequest;
13use mas_storage::{
14    Pagination, RepositoryAccess,
15    compat::CompatSessionFilter,
16    oauth2::OAuth2SessionFilter,
17    personal::PersonalSessionFilter,
18    queue::{
19        DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, QueueJobRepositoryExt as _,
20        SyncDevicesJob,
21    },
22    user::{UserEmailRepository, UserRepository},
23};
24use tracing::info;
25
26use crate::{
27    State,
28    new_queue::{JobContext, JobError, RunnableJob},
29};
30
31/// Job to provision a user on the Matrix homeserver.
32/// This works by doing a PUT request to the
33/// `/_synapse/admin/v2/users/{user_id}` endpoint.
34#[async_trait]
35impl RunnableJob for ProvisionUserJob {
36    #[tracing::instrument(
37        name = "job.provision_user"
38        fields(user.id = %self.user_id()),
39        skip_all,
40    )]
41    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
42        let matrix = state.matrix_connection();
43        let mut repo = state.repository().await.map_err(JobError::retry)?;
44        let mut rng = state.rng();
45        let clock = state.clock();
46
47        let user = repo
48            .user()
49            .lookup(self.user_id())
50            .await
51            .map_err(JobError::retry)?
52            .context("User not found")
53            .map_err(JobError::fail)?;
54
55        let emails = repo
56            .user_email()
57            .all(&user)
58            .await
59            .map_err(JobError::retry)?
60            .into_iter()
61            .map(|email| email.email)
62            .collect();
63        let mut request = ProvisionRequest::new(
64            user.username.clone(),
65            user.sub.clone(),
66            user.locked_at.is_some(),
67        )
68        .set_emails(emails);
69
70        if let Some(display_name) = self.display_name_to_set() {
71            request = request.set_displayname(display_name.to_owned());
72        }
73
74        let created = matrix
75            .provision_user(&request)
76            .await
77            .map_err(JobError::retry)?;
78
79        let mxid = matrix.mxid(&user.username);
80        if created {
81            info!(%user.id, %mxid, "User created");
82        } else {
83            info!(%user.id, %mxid, "User updated");
84        }
85
86        // Schedule a device sync job
87        let sync_device_job = SyncDevicesJob::new(&user);
88        repo.queue_job()
89            .schedule_job(&mut rng, clock, sync_device_job)
90            .await
91            .map_err(JobError::retry)?;
92
93        repo.save().await.map_err(JobError::retry)?;
94
95        Ok(())
96    }
97}
98
99/// Job to provision a device on the Matrix homeserver.
100///
101/// This job is deprecated and therefore just schedules a [`SyncDevicesJob`]
102#[async_trait]
103impl RunnableJob for ProvisionDeviceJob {
104    #[tracing::instrument(
105        name = "job.provision_device"
106        fields(
107            user.id = %self.user_id(),
108            device.id = %self.device_id(),
109        ),
110        skip_all,
111    )]
112    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
113        let mut repo = state.repository().await.map_err(JobError::retry)?;
114        let mut rng = state.rng();
115        let clock = state.clock();
116
117        let user = repo
118            .user()
119            .lookup(self.user_id())
120            .await
121            .map_err(JobError::retry)?
122            .context("User not found")
123            .map_err(JobError::fail)?;
124
125        // Schedule a device sync job
126        repo.queue_job()
127            .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
128            .await
129            .map_err(JobError::retry)?;
130
131        Ok(())
132    }
133}
134
135/// Job to delete a device from a user's account.
136///
137/// This job is deprecated and therefore just schedules a [`SyncDevicesJob`]
138#[async_trait]
139impl RunnableJob for DeleteDeviceJob {
140    #[tracing::instrument(
141        name = "job.delete_device"
142        fields(
143            user.id = %self.user_id(),
144            device.id = %self.device_id(),
145        ),
146        skip_all,
147    )]
148    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
149        let mut rng = state.rng();
150        let clock = state.clock();
151        let mut repo = state.repository().await.map_err(JobError::retry)?;
152
153        let user = repo
154            .user()
155            .lookup(self.user_id())
156            .await
157            .map_err(JobError::retry)?
158            .context("User not found")
159            .map_err(JobError::fail)?;
160
161        // Schedule a device sync job
162        repo.queue_job()
163            .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user))
164            .await
165            .map_err(JobError::retry)?;
166
167        Ok(())
168    }
169}
170
171/// Job to sync the list of devices of a user with the homeserver.
172#[async_trait]
173impl RunnableJob for SyncDevicesJob {
174    #[tracing::instrument(
175        name = "job.sync_devices",
176        fields(user.id = %self.user_id()),
177        skip_all,
178    )]
179    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
180        let matrix = state.matrix_connection();
181        let mut repo = state.repository().await.map_err(JobError::retry)?;
182
183        let user = repo
184            .user()
185            .lookup(self.user_id())
186            .await
187            .map_err(JobError::retry)?
188            .context("User not found")
189            .map_err(JobError::fail)?;
190
191        // Lock the user sync to make sure we don't get into a race condition
192        repo.user()
193            .acquire_lock_for_sync(&user)
194            .await
195            .map_err(JobError::retry)?;
196
197        let mut devices = HashSet::new();
198
199        // Cycle through all the compat sessions of the user, and grab the devices
200        let mut cursor = Pagination::first(5000);
201        loop {
202            let page = repo
203                .compat_session()
204                .list(
205                    CompatSessionFilter::new().for_user(&user).active_only(),
206                    cursor,
207                )
208                .await
209                .map_err(JobError::retry)?;
210
211            for edge in page.edges {
212                let (compat_session, _) = edge.node;
213                if let Some(ref device) = compat_session.device {
214                    devices.insert(device.as_str().to_owned());
215                }
216                cursor = cursor.after(edge.cursor);
217            }
218
219            if !page.has_next_page {
220                break;
221            }
222        }
223
224        // Cycle though all the oauth2 sessions of the user, and grab the devices
225        let mut cursor = Pagination::first(5000);
226        loop {
227            let page = repo
228                .oauth2_session()
229                .list(
230                    OAuth2SessionFilter::new().for_user(&user).active_only(),
231                    cursor,
232                )
233                .await
234                .map_err(JobError::retry)?;
235
236            for edge in page.edges {
237                for scope in &*edge.node.scope {
238                    if let Some(device) = Device::from_scope_token(scope) {
239                        devices.insert(device.as_str().to_owned());
240                    }
241                }
242
243                cursor = cursor.after(edge.cursor);
244            }
245
246            if !page.has_next_page {
247                break;
248            }
249        }
250
251        // Cycle through all the personal sessions of the user and get the devices
252        let mut cursor = Pagination::first(5000);
253        loop {
254            let page = repo
255                .personal_session()
256                .list(
257                    PersonalSessionFilter::new()
258                        .for_actor_user(&user)
259                        .active_only(),
260                    cursor,
261                )
262                .await
263                .map_err(JobError::retry)?;
264
265            for edge in page.edges {
266                let (session, _) = &edge.node;
267                for scope in &*session.scope {
268                    if let Some(device) = Device::from_scope_token(scope) {
269                        devices.insert(device.as_str().to_owned());
270                    }
271                }
272
273                cursor = cursor.after(edge.cursor);
274            }
275
276            if !page.has_next_page {
277                break;
278            }
279        }
280
281        matrix
282            .sync_devices(&user.username, devices)
283            .await
284            .map_err(JobError::retry)?;
285
286        // We kept the connection until now, so that we still hold the lock on the user
287        // throughout the sync
288        repo.save().await.map_err(JobError::retry)?;
289
290        Ok(())
291    }
292}