mas_tasks/
matrix.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use std::collections::HashSet;
8
9use anyhow::Context;
10use async_trait::async_trait;
11use mas_data_model::Device;
12use mas_matrix::ProvisionRequest;
13use mas_storage::{
14    Pagination, RepositoryAccess,
15    compat::CompatSessionFilter,
16    oauth2::OAuth2SessionFilter,
17    queue::{
18        DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, QueueJobRepositoryExt as _,
19        SyncDevicesJob,
20    },
21    user::{UserEmailRepository, UserRepository},
22};
23use tracing::info;
24
25use crate::{
26    State,
27    new_queue::{JobContext, JobError, RunnableJob},
28};
29
30/// Job to provision a user on the Matrix homeserver.
31/// This works by doing a PUT request to the
32/// /_synapse/admin/v2/users/{user_id} endpoint.
33#[async_trait]
34impl RunnableJob for ProvisionUserJob {
35    #[tracing::instrument(
36        name = "job.provision_user"
37        fields(user.id = %self.user_id()),
38        skip_all,
39        err,
40    )]
41    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
42        let matrix = state.matrix_connection();
43        let mut repo = state.repository().await.map_err(JobError::retry)?;
44        let mut rng = state.rng();
45        let clock = state.clock();
46
47        let user = repo
48            .user()
49            .lookup(self.user_id())
50            .await
51            .map_err(JobError::retry)?
52            .context("User not found")
53            .map_err(JobError::fail)?;
54
55        let mxid = matrix.mxid(&user.username);
56        let emails = repo
57            .user_email()
58            .all(&user)
59            .await
60            .map_err(JobError::retry)?
61            .into_iter()
62            .map(|email| email.email)
63            .collect();
64        let mut request = ProvisionRequest::new(mxid.clone(), user.sub.clone()).set_emails(emails);
65
66        if let Some(display_name) = self.display_name_to_set() {
67            request = request.set_displayname(display_name.to_owned());
68        }
69
70        let created = matrix
71            .provision_user(&request)
72            .await
73            .map_err(JobError::retry)?;
74
75        if created {
76            info!(%user.id, %mxid, "User created");
77        } else {
78            info!(%user.id, %mxid, "User updated");
79        }
80
81        // Schedule a device sync job
82        let sync_device_job = SyncDevicesJob::new(&user);
83        repo.queue_job()
84            .schedule_job(&mut rng, &clock, sync_device_job)
85            .await
86            .map_err(JobError::retry)?;
87
88        repo.save().await.map_err(JobError::retry)?;
89
90        Ok(())
91    }
92}
93
94/// Job to provision a device on the Matrix homeserver.
95///
96/// This job is deprecated and therefore just schedules a [`SyncDevicesJob`]
97#[async_trait]
98impl RunnableJob for ProvisionDeviceJob {
99    #[tracing::instrument(
100        name = "job.provision_device"
101        fields(
102            user.id = %self.user_id(),
103            device.id = %self.device_id(),
104        ),
105        skip_all,
106        err,
107    )]
108    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
109        let mut repo = state.repository().await.map_err(JobError::retry)?;
110        let mut rng = state.rng();
111        let clock = state.clock();
112
113        let user = repo
114            .user()
115            .lookup(self.user_id())
116            .await
117            .map_err(JobError::retry)?
118            .context("User not found")
119            .map_err(JobError::fail)?;
120
121        // Schedule a device sync job
122        repo.queue_job()
123            .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
124            .await
125            .map_err(JobError::retry)?;
126
127        Ok(())
128    }
129}
130
131/// Job to delete a device from a user's account.
132///
133/// This job is deprecated and therefore just schedules a [`SyncDevicesJob`]
134#[async_trait]
135impl RunnableJob for DeleteDeviceJob {
136    #[tracing::instrument(
137        name = "job.delete_device"
138        fields(
139            user.id = %self.user_id(),
140            device.id = %self.device_id(),
141        ),
142        skip_all,
143        err,
144    )]
145    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
146        let mut rng = state.rng();
147        let clock = state.clock();
148        let mut repo = state.repository().await.map_err(JobError::retry)?;
149
150        let user = repo
151            .user()
152            .lookup(self.user_id())
153            .await
154            .map_err(JobError::retry)?
155            .context("User not found")
156            .map_err(JobError::fail)?;
157
158        // Schedule a device sync job
159        repo.queue_job()
160            .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
161            .await
162            .map_err(JobError::retry)?;
163
164        Ok(())
165    }
166}
167
168/// Job to sync the list of devices of a user with the homeserver.
169#[async_trait]
170impl RunnableJob for SyncDevicesJob {
171    #[tracing::instrument(
172        name = "job.sync_devices",
173        fields(user.id = %self.user_id()),
174        skip_all,
175        err,
176    )]
177    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
178        let matrix = state.matrix_connection();
179        let mut repo = state.repository().await.map_err(JobError::retry)?;
180
181        let user = repo
182            .user()
183            .lookup(self.user_id())
184            .await
185            .map_err(JobError::retry)?
186            .context("User not found")
187            .map_err(JobError::fail)?;
188
189        // Lock the user sync to make sure we don't get into a race condition
190        repo.user()
191            .acquire_lock_for_sync(&user)
192            .await
193            .map_err(JobError::retry)?;
194
195        let mut devices = HashSet::new();
196
197        // Cycle through all the compat sessions of the user, and grab the devices
198        let mut cursor = Pagination::first(100);
199        loop {
200            let page = repo
201                .compat_session()
202                .list(
203                    CompatSessionFilter::new().for_user(&user).active_only(),
204                    cursor,
205                )
206                .await
207                .map_err(JobError::retry)?;
208
209            for (compat_session, _) in page.edges {
210                if let Some(ref device) = compat_session.device {
211                    devices.insert(device.as_str().to_owned());
212                }
213                cursor = cursor.after(compat_session.id);
214            }
215
216            if !page.has_next_page {
217                break;
218            }
219        }
220
221        // Cycle though all the oauth2 sessions of the user, and grab the devices
222        let mut cursor = Pagination::first(100);
223        loop {
224            let page = repo
225                .oauth2_session()
226                .list(
227                    OAuth2SessionFilter::new().for_user(&user).active_only(),
228                    cursor,
229                )
230                .await
231                .map_err(JobError::retry)?;
232
233            for oauth2_session in page.edges {
234                for scope in &*oauth2_session.scope {
235                    if let Some(device) = Device::from_scope_token(scope) {
236                        devices.insert(device.as_str().to_owned());
237                    }
238                }
239
240                cursor = cursor.after(oauth2_session.id);
241            }
242
243            if !page.has_next_page {
244                break;
245            }
246        }
247
248        let mxid = matrix.mxid(&user.username);
249        matrix
250            .sync_devices(&mxid, devices)
251            .await
252            .map_err(JobError::retry)?;
253
254        // We kept the connection until now, so that we still hold the lock on the user
255        // throughout the sync
256        repo.save().await.map_err(JobError::retry)?;
257
258        Ok(())
259    }
260}