syn2mas/
migration.rs

1// Copyright 2024 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6//! # Migration
7//!
8//! This module provides the high-level logic for performing the Synapse-to-MAS
9//! database migration.
10//!
11//! This module does not implement any of the safety checks that should be run
12//! *before* the migration.
13
14use std::{pin::pin, time::Instant};
15
16use chrono::{DateTime, Utc};
17use compact_str::CompactString;
18use futures_util::{SinkExt, StreamExt as _, TryFutureExt, TryStreamExt as _};
19use mas_storage::Clock;
20use opentelemetry::{KeyValue, metrics::Counter};
21use rand::{RngCore, SeedableRng};
22use thiserror::Error;
23use thiserror_ext::ContextInto;
24use tokio_util::sync::PollSender;
25use tracing::{Instrument as _, Level, info};
26use ulid::Ulid;
27use uuid::{NonNilUuid, Uuid};
28
29use crate::{
30    HashMap, ProgressCounter, RandomState, SynapseReader,
31    mas_writer::{
32        self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
33        MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
34        MasNewUserPassword, MasWriteBuffer, MasWriter,
35    },
36    progress::Progress,
37    synapse_reader::{
38        self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
39        SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
40    },
41    telemetry::{
42        K_ENTITY, METER, V_ENTITY_DEVICES, V_ENTITY_EXTERNAL_IDS,
43        V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, V_ENTITY_REFRESHABLE_TOKEN_PAIRS,
44        V_ENTITY_THREEPIDS, V_ENTITY_USERS,
45    },
46};
47
48#[derive(Debug, Error, ContextInto)]
49pub enum Error {
50    #[error("error when reading synapse DB ({context}): {source}")]
51    Synapse {
52        source: synapse_reader::Error,
53        context: String,
54    },
55    #[error("error when writing to MAS DB ({context}): {source}")]
56    Mas {
57        source: mas_writer::Error,
58        context: String,
59    },
60    #[error("failed to extract localpart of {user:?}: {source}")]
61    ExtractLocalpart {
62        source: ExtractLocalpartError,
63        user: FullUserId,
64    },
65    #[error("channel closed")]
66    ChannelClosed,
67
68    #[error("task failed ({context}): {source}")]
69    Join {
70        source: tokio::task::JoinError,
71        context: String,
72    },
73
74    #[error("user {user} was not found for migration but a row in {table} was found for them")]
75    MissingUserFromDependentTable { table: String, user: FullUserId },
76    #[error(
77        "missing a mapping for the auth provider with ID {synapse_id:?} (used by {user} and maybe other users)"
78    )]
79    MissingAuthProviderMapping {
80        /// `auth_provider` ID of the provider in Synapse, for which we have no
81        /// mapping
82        synapse_id: String,
83        /// a user that is using this auth provider
84        user: FullUserId,
85    },
86}
87
88bitflags::bitflags! {
89    #[derive(Debug, Clone, Copy)]
90    struct UserFlags: u8 {
91        const IS_SYNAPSE_ADMIN = 0b0000_0001;
92        const IS_DEACTIVATED = 0b0000_0010;
93        const IS_GUEST = 0b0000_0100;
94        const IS_APPSERVICE = 0b0000_1000;
95    }
96}
97
98impl UserFlags {
99    const fn is_deactivated(self) -> bool {
100        self.contains(UserFlags::IS_DEACTIVATED)
101    }
102
103    const fn is_guest(self) -> bool {
104        self.contains(UserFlags::IS_GUEST)
105    }
106
107    const fn is_synapse_admin(self) -> bool {
108        self.contains(UserFlags::IS_SYNAPSE_ADMIN)
109    }
110
111    const fn is_appservice(self) -> bool {
112        self.contains(UserFlags::IS_APPSERVICE)
113    }
114}
115
116#[derive(Debug, Clone, Copy)]
117struct UserInfo {
118    mas_user_id: Option<NonNilUuid>,
119    flags: UserFlags,
120}
121
122struct MigrationState {
123    /// The server name we're migrating from
124    server_name: String,
125
126    /// Lookup table from user localpart to that user's infos
127    users: HashMap<CompactString, UserInfo>,
128
129    /// Mapping of MAS user ID + device ID to a MAS compat session ID.
130    devices_to_compat_sessions: HashMap<(NonNilUuid, CompactString), Uuid>,
131
132    /// A mapping of Synapse external ID providers to MAS upstream OAuth 2.0
133    /// provider ID
134    provider_id_mapping: std::collections::HashMap<String, Uuid>,
135}
136
137/// Performs a migration from Synapse's database to MAS' database.
138///
139/// # Panics
140///
141/// - If there are more than `usize::MAX` users
142///
143/// # Errors
144///
145/// Errors are returned under the following circumstances:
146///
147/// - An underlying database access error, either to MAS or to Synapse.
148/// - Invalid data in the Synapse database.
149#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
150pub async fn migrate(
151    mut synapse: SynapseReader<'_>,
152    mas: MasWriter,
153    server_name: String,
154    clock: &dyn Clock,
155    rng: &mut impl RngCore,
156    provider_id_mapping: std::collections::HashMap<String, Uuid>,
157    progress: &Progress,
158) -> Result<(), Error> {
159    let counts = synapse.count_rows().await.into_synapse("counting users")?;
160
161    let approx_total_counter = METER
162        .u64_counter("syn2mas.entity.approx_total")
163        .with_description("Approximate number of entities of this type to be migrated")
164        .build();
165    let migrated_otel_counter = METER
166        .u64_counter("syn2mas.entity.migrated")
167        .with_description("Number of entities of this type that have been migrated so far")
168        .build();
169    let skipped_otel_counter = METER
170        .u64_counter("syn2mas.entity.skipped")
171        .with_description("Number of entities of this type that have been skipped so far")
172        .build();
173
174    approx_total_counter.add(
175        counts.users as u64,
176        &[KeyValue::new(K_ENTITY, V_ENTITY_USERS)],
177    );
178    approx_total_counter.add(
179        counts.devices as u64,
180        &[KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)],
181    );
182    approx_total_counter.add(
183        counts.threepids as u64,
184        &[KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)],
185    );
186    approx_total_counter.add(
187        counts.external_ids as u64,
188        &[KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)],
189    );
190    // assume 1 refreshable access token per refresh token.
191    let approx_nonrefreshable_access_tokens = counts.access_tokens - counts.refresh_tokens;
192    approx_total_counter.add(
193        approx_nonrefreshable_access_tokens as u64,
194        &[KeyValue::new(
195            K_ENTITY,
196            V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
197        )],
198    );
199    approx_total_counter.add(
200        counts.refresh_tokens as u64,
201        &[KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)],
202    );
203
204    let state = MigrationState {
205        server_name,
206        // We oversize the hashmaps, as the estimates are innaccurate, and we would like to avoid
207        // reallocations.
208        users: HashMap::with_capacity_and_hasher(counts.users * 9 / 8, RandomState::default()),
209        devices_to_compat_sessions: HashMap::with_capacity_and_hasher(
210            counts.devices * 9 / 8,
211            RandomState::default(),
212        ),
213        provider_id_mapping,
214    };
215
216    let progress_counter = progress.migrating_data(V_ENTITY_USERS, counts.users);
217    let (mas, state) = migrate_users(
218        &mut synapse,
219        mas,
220        state,
221        rng,
222        progress_counter,
223        migrated_otel_counter.clone(),
224        skipped_otel_counter.clone(),
225    )
226    .await?;
227
228    let progress_counter = progress.migrating_data(V_ENTITY_THREEPIDS, counts.threepids);
229    let (mas, state) = migrate_threepids(
230        &mut synapse,
231        mas,
232        rng,
233        state,
234        progress_counter,
235        migrated_otel_counter.clone(),
236        skipped_otel_counter.clone(),
237    )
238    .await?;
239
240    let progress_counter = progress.migrating_data(V_ENTITY_EXTERNAL_IDS, counts.external_ids);
241    let (mas, state) = migrate_external_ids(
242        &mut synapse,
243        mas,
244        rng,
245        state,
246        progress_counter,
247        migrated_otel_counter.clone(),
248        skipped_otel_counter.clone(),
249    )
250    .await?;
251
252    let progress_counter = progress.migrating_data(
253        V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
254        counts.access_tokens - counts.refresh_tokens,
255    );
256    let (mas, state) = migrate_unrefreshable_access_tokens(
257        &mut synapse,
258        mas,
259        clock,
260        rng,
261        state,
262        progress_counter,
263        migrated_otel_counter.clone(),
264        skipped_otel_counter.clone(),
265    )
266    .await?;
267
268    let progress_counter =
269        progress.migrating_data(V_ENTITY_REFRESHABLE_TOKEN_PAIRS, counts.refresh_tokens);
270    let (mas, state) = migrate_refreshable_token_pairs(
271        &mut synapse,
272        mas,
273        clock,
274        rng,
275        state,
276        progress_counter,
277        migrated_otel_counter.clone(),
278        skipped_otel_counter.clone(),
279    )
280    .await?;
281
282    let progress_counter = progress.migrating_data("devices", counts.devices);
283    let (mas, _state) = migrate_devices(
284        &mut synapse,
285        mas,
286        rng,
287        state,
288        progress_counter,
289        migrated_otel_counter.clone(),
290        skipped_otel_counter.clone(),
291    )
292    .await?;
293
294    synapse
295        .finish()
296        .await
297        .into_synapse("failed to close Synapse reader")?;
298
299    mas.finish(progress)
300        .await
301        .into_mas("failed to finalise MAS database")?;
302
303    Ok(())
304}
305
306#[tracing::instrument(skip_all, level = Level::INFO)]
307async fn migrate_users(
308    synapse: &mut SynapseReader<'_>,
309    mut mas: MasWriter,
310    mut state: MigrationState,
311    rng: &mut impl RngCore,
312    progress_counter: ProgressCounter,
313    migrated_otel_counter: Counter<u64>,
314    skipped_otel_counter: Counter<u64>,
315) -> Result<(MasWriter, MigrationState), Error> {
316    let start = Instant::now();
317    let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)];
318
319    let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseUser>(10 * 1024 * 1024);
320
321    // create a new RNG seeded from the passed RNG so that we can move it into the
322    // spawned task
323    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
324    let task = tokio::spawn(
325        async move {
326            let mut user_buffer = MasWriteBuffer::new(&mas, MasWriter::write_users);
327            let mut password_buffer = MasWriteBuffer::new(&mas, MasWriter::write_passwords);
328
329            while let Some(user) = rx.recv().await {
330                // Handling an edge case: some AS users may have invalid localparts containing
331                // extra `:` characters. These users are ignored and a warning is logged.
332                if user.appservice_id.is_some()
333                    && user
334                        .name
335                        .0
336                        .strip_suffix(&format!(":{}", state.server_name))
337                        .is_some_and(|localpart| localpart.contains(':'))
338                {
339                    tracing::warn!("AS user {} has invalid localpart, ignoring!", user.name.0);
340                    continue;
341                }
342
343                let (mas_user, mas_password_opt) =
344                    transform_user(&user, &state.server_name, &mut rng)?;
345
346                let mut flags = UserFlags::empty();
347                if bool::from(user.admin) {
348                    flags |= UserFlags::IS_SYNAPSE_ADMIN;
349                }
350                if bool::from(user.deactivated) {
351                    flags |= UserFlags::IS_DEACTIVATED;
352                }
353                if bool::from(user.is_guest) {
354                    flags |= UserFlags::IS_GUEST;
355                }
356                if user.appservice_id.is_some() {
357                    flags |= UserFlags::IS_APPSERVICE;
358
359                    skipped_otel_counter.add(1, &otel_kv);
360                    progress_counter.increment_skipped();
361
362                    // Special case for appservice users: we don't insert them into the database
363                    // We just record the user's information in the state and continue
364                    state.users.insert(
365                        CompactString::new(&mas_user.username),
366                        UserInfo {
367                            mas_user_id: None,
368                            flags,
369                        },
370                    );
371                    continue;
372                }
373
374                state.users.insert(
375                    CompactString::new(&mas_user.username),
376                    UserInfo {
377                        mas_user_id: Some(mas_user.user_id),
378                        flags,
379                    },
380                );
381
382                user_buffer
383                    .write(&mut mas, mas_user)
384                    .await
385                    .into_mas("writing user")?;
386
387                if let Some(mas_password) = mas_password_opt {
388                    password_buffer
389                        .write(&mut mas, mas_password)
390                        .await
391                        .into_mas("writing password")?;
392                }
393
394                migrated_otel_counter.add(1, &otel_kv);
395                progress_counter.increment_migrated();
396            }
397
398            user_buffer
399                .finish(&mut mas)
400                .await
401                .into_mas("writing users")?;
402            password_buffer
403                .finish(&mut mas)
404                .await
405                .into_mas("writing passwords")?;
406
407            Ok((mas, state))
408        }
409        .instrument(tracing::info_span!("ingest_task")),
410    );
411
412    // In case this has an error, we still want to join the task, so we look at the
413    // error later
414    let res = synapse
415        .read_users()
416        .map_err(|e| e.into_synapse("reading users"))
417        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
418        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
419        .await;
420
421    let (mas, state) = task.await.into_join("user write task")??;
422
423    res?;
424
425    info!(
426        "users migrated in {:.1}s",
427        Instant::now().duration_since(start).as_secs_f64()
428    );
429
430    Ok((mas, state))
431}
432
433#[tracing::instrument(skip_all, level = Level::INFO)]
434async fn migrate_threepids(
435    synapse: &mut SynapseReader<'_>,
436    mut mas: MasWriter,
437    rng: &mut impl RngCore,
438    state: MigrationState,
439    progress_counter: ProgressCounter,
440    migrated_otel_counter: Counter<u64>,
441    skipped_otel_counter: Counter<u64>,
442) -> Result<(MasWriter, MigrationState), Error> {
443    let start = Instant::now();
444    let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)];
445
446    let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
447    let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
448    let mut users_stream = pin!(synapse.read_threepids());
449
450    while let Some(threepid_res) = users_stream.next().await {
451        let SynapseThreepid {
452            user_id: synapse_user_id,
453            medium,
454            address,
455            added_at,
456        } = threepid_res.into_synapse("reading threepid")?;
457        let created_at: DateTime<Utc> = added_at.into();
458
459        let username = synapse_user_id
460            .extract_localpart(&state.server_name)
461            .into_extract_localpart(synapse_user_id.clone())?
462            .to_owned();
463        let Some(user_infos) = state.users.get(username.as_str()).copied() else {
464            return Err(Error::MissingUserFromDependentTable {
465                table: "user_threepids".to_owned(),
466                user: synapse_user_id,
467            });
468        };
469
470        let Some(mas_user_id) = user_infos.mas_user_id else {
471            progress_counter.increment_skipped();
472            skipped_otel_counter.add(1, &otel_kv);
473            continue;
474        };
475
476        if medium == "email" {
477            email_buffer
478                .write(
479                    &mut mas,
480                    MasNewEmailThreepid {
481                        user_id: mas_user_id,
482                        user_email_id: Uuid::from(Ulid::from_datetime_with_source(
483                            created_at.into(),
484                            rng,
485                        )),
486                        email: address,
487                        created_at,
488                    },
489                )
490                .await
491                .into_mas("writing email")?;
492        } else {
493            unsupported_buffer
494                .write(
495                    &mut mas,
496                    MasNewUnsupportedThreepid {
497                        user_id: mas_user_id,
498                        medium,
499                        address,
500                        created_at,
501                    },
502                )
503                .await
504                .into_mas("writing unsupported threepid")?;
505        }
506
507        migrated_otel_counter.add(1, &otel_kv);
508        progress_counter.increment_migrated();
509    }
510
511    email_buffer
512        .finish(&mut mas)
513        .await
514        .into_mas("writing email threepids")?;
515    unsupported_buffer
516        .finish(&mut mas)
517        .await
518        .into_mas("writing unsupported threepids")?;
519
520    info!(
521        "third-party IDs migrated in {:.1}s",
522        Instant::now().duration_since(start).as_secs_f64()
523    );
524
525    Ok((mas, state))
526}
527
528/// # Parameters
529///
530/// - `provider_id_mapping`: mapping from Synapse `auth_provider` ID to UUID of
531///   the upstream provider in MAS.
532#[tracing::instrument(skip_all, level = Level::INFO)]
533async fn migrate_external_ids(
534    synapse: &mut SynapseReader<'_>,
535    mut mas: MasWriter,
536    rng: &mut impl RngCore,
537    state: MigrationState,
538    progress_counter: ProgressCounter,
539    migrated_otel_counter: Counter<u64>,
540    skipped_otel_counter: Counter<u64>,
541) -> Result<(MasWriter, MigrationState), Error> {
542    let start = Instant::now();
543    let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)];
544
545    let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
546    let mut extids_stream = pin!(synapse.read_user_external_ids());
547
548    while let Some(extid_res) = extids_stream.next().await {
549        let SynapseExternalId {
550            user_id: synapse_user_id,
551            auth_provider,
552            external_id: subject,
553        } = extid_res.into_synapse("reading external ID")?;
554        let username = synapse_user_id
555            .extract_localpart(&state.server_name)
556            .into_extract_localpart(synapse_user_id.clone())?
557            .to_owned();
558        let Some(user_infos) = state.users.get(username.as_str()).copied() else {
559            return Err(Error::MissingUserFromDependentTable {
560                table: "user_external_ids".to_owned(),
561                user: synapse_user_id,
562            });
563        };
564
565        let Some(mas_user_id) = user_infos.mas_user_id else {
566            progress_counter.increment_skipped();
567            skipped_otel_counter.add(1, &otel_kv);
568            continue;
569        };
570
571        let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) else {
572            return Err(Error::MissingAuthProviderMapping {
573                synapse_id: auth_provider,
574                user: synapse_user_id,
575            });
576        };
577
578        // To save having to store user creation times, extract it from the ULID
579        // This gives millisecond precision — good enough.
580        let user_created_ts = Ulid::from(mas_user_id.get()).datetime();
581
582        let link_id: Uuid = Ulid::from_datetime_with_source(user_created_ts, rng).into();
583
584        write_buffer
585            .write(
586                &mut mas,
587                MasNewUpstreamOauthLink {
588                    link_id,
589                    user_id: mas_user_id,
590                    upstream_provider_id,
591                    subject,
592                    created_at: user_created_ts.into(),
593                },
594            )
595            .await
596            .into_mas("failed to write upstream link")?;
597
598        migrated_otel_counter.add(1, &otel_kv);
599        progress_counter.increment_migrated();
600    }
601
602    write_buffer
603        .finish(&mut mas)
604        .await
605        .into_mas("writing upstream links")?;
606
607    info!(
608        "upstream links (external IDs) migrated in {:.1}s",
609        Instant::now().duration_since(start).as_secs_f64()
610    );
611
612    Ok((mas, state))
613}
614
615/// Migrate devices from Synapse to MAS (as compat sessions).
616///
617/// In order to get the right session creation timestamps, the access tokens
618/// must counterintuitively be migrated first, with the ULIDs passed in as
619/// `devices`.
620///
621/// This is because only access tokens store a timestamp that in any way
622/// resembles a creation timestamp.
623#[tracing::instrument(skip_all, level = Level::INFO)]
624async fn migrate_devices(
625    synapse: &mut SynapseReader<'_>,
626    mut mas: MasWriter,
627    rng: &mut impl RngCore,
628    mut state: MigrationState,
629    progress_counter: ProgressCounter,
630    migrated_otel_counter: Counter<u64>,
631    skipped_otel_counter: Counter<u64>,
632) -> Result<(MasWriter, MigrationState), Error> {
633    let start = Instant::now();
634    let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)];
635
636    let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024);
637
638    // create a new RNG seeded from the passed RNG so that we can move it into the
639    // spawned task
640    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
641    let task = tokio::spawn(
642        async move {
643            let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
644
645            while let Some(device) = rx.recv().await {
646                let SynapseDevice {
647                    user_id: synapse_user_id,
648                    device_id,
649                    display_name,
650                    last_seen,
651                    ip,
652                    user_agent,
653                } = device;
654                let username = synapse_user_id
655                    .extract_localpart(&state.server_name)
656                    .into_extract_localpart(synapse_user_id.clone())?
657                    .to_owned();
658                let Some(user_infos) = state.users.get(username.as_str()).copied() else {
659                    return Err(Error::MissingUserFromDependentTable {
660                        table: "devices".to_owned(),
661                        user: synapse_user_id,
662                    });
663                };
664
665                let Some(mas_user_id) = user_infos.mas_user_id else {
666                    progress_counter.increment_skipped();
667                    skipped_otel_counter.add(1, &otel_kv);
668                    continue;
669                };
670
671                if user_infos.flags.is_deactivated()
672                    || user_infos.flags.is_guest()
673                    || user_infos.flags.is_appservice()
674                {
675                    continue;
676                }
677
678                let session_id = *state
679                    .devices_to_compat_sessions
680                    .entry((mas_user_id, CompactString::new(&device_id)))
681                    .or_insert_with(||
682                // We don't have a creation time for this device (as it has no access token),
683                // so use now as a least-evil fallback.
684                Ulid::with_source(&mut rng).into());
685                let created_at = Ulid::from(session_id).datetime().into();
686
687                // As we're using a real IP type in the MAS database, it is possible
688                // that we encounter invalid IP addresses in the Synapse database.
689                // In that case, we should ignore them, but still log a warning.
690                // One special case: Synapse will record '-' as IP in some cases, we don't want
691                // to log about those
692                let last_active_ip = ip.filter(|ip| ip != "-").and_then(|ip| {
693                    ip.parse()
694                        .map_err(|e| {
695                            tracing::warn!(
696                                error = &e as &dyn std::error::Error,
697                                mxid = %synapse_user_id,
698                                %device_id,
699                                %ip,
700                                "Failed to parse device IP, ignoring"
701                            );
702                        })
703                        .ok()
704                });
705
706                write_buffer
707                    .write(
708                        &mut mas,
709                        MasNewCompatSession {
710                            session_id,
711                            user_id: mas_user_id,
712                            device_id: Some(device_id),
713                            human_name: display_name,
714                            created_at,
715                            is_synapse_admin: user_infos.flags.is_synapse_admin(),
716                            last_active_at: last_seen.map(DateTime::from),
717                            last_active_ip,
718                            user_agent,
719                        },
720                    )
721                    .await
722                    .into_mas("writing compat sessions")?;
723
724                migrated_otel_counter.add(1, &otel_kv);
725                progress_counter.increment_migrated();
726            }
727
728            write_buffer
729                .finish(&mut mas)
730                .await
731                .into_mas("writing compat sessions")?;
732
733            Ok((mas, state))
734        }
735        .instrument(tracing::info_span!("ingest_task")),
736    );
737
738    // In case this has an error, we still want to join the task, so we look at the
739    // error later
740    let res = synapse
741        .read_devices()
742        .map_err(|e| e.into_synapse("reading devices"))
743        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
744        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
745        .await;
746
747    let (mas, state) = task.await.into_join("device write task")??;
748
749    res?;
750
751    info!(
752        "devices migrated in {:.1}s",
753        Instant::now().duration_since(start).as_secs_f64()
754    );
755
756    Ok((mas, state))
757}
758
759/// Migrates unrefreshable access tokens (those without an associated refresh
760/// token). Some of these may be deviceless.
761#[tracing::instrument(skip_all, level = Level::INFO)]
762#[allow(clippy::too_many_arguments)]
763async fn migrate_unrefreshable_access_tokens(
764    synapse: &mut SynapseReader<'_>,
765    mut mas: MasWriter,
766    clock: &dyn Clock,
767    rng: &mut impl RngCore,
768    mut state: MigrationState,
769    progress_counter: ProgressCounter,
770    migrated_otel_counter: Counter<u64>,
771    skipped_otel_counter: Counter<u64>,
772) -> Result<(MasWriter, MigrationState), Error> {
773    let start = Instant::now();
774    let otel_kv = [KeyValue::new(
775        K_ENTITY,
776        V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
777    )];
778
779    let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024);
780
781    let now = clock.now();
782    // create a new RNG seeded from the passed RNG so that we can move it into the
783    // spawned task
784    let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
785    let task = tokio::spawn(
786        async move {
787            let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
788            let mut deviceless_session_write_buffer =
789                MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
790
791            while let Some(token) = rx.recv().await {
792                let SynapseAccessToken {
793                    user_id: synapse_user_id,
794                    device_id,
795                    token,
796                    valid_until_ms,
797                    last_validated,
798                } = token;
799                let username = synapse_user_id
800                    .extract_localpart(&state.server_name)
801                    .into_extract_localpart(synapse_user_id.clone())?
802                    .to_owned();
803                let Some(user_infos) = state.users.get(username.as_str()).copied() else {
804                    return Err(Error::MissingUserFromDependentTable {
805                        table: "access_tokens".to_owned(),
806                        user: synapse_user_id,
807                    });
808                };
809
810                let Some(mas_user_id) = user_infos.mas_user_id else {
811                    progress_counter.increment_skipped();
812                    skipped_otel_counter.add(1, &otel_kv);
813                    continue;
814                };
815
816                if user_infos.flags.is_deactivated()
817                    || user_infos.flags.is_guest()
818                    || user_infos.flags.is_appservice()
819                {
820                    progress_counter.increment_skipped();
821                    skipped_otel_counter.add(1, &otel_kv);
822                    continue;
823                }
824
825                // It's not always accurate, but last_validated is *often* the creation time of
826                // the device If we don't have one, then use the current time as a
827                // fallback.
828                let created_at = last_validated.map_or_else(|| now, DateTime::from);
829
830                let session_id = if let Some(device_id) = device_id {
831                    // Use the existing device_id if this is the second token for a device
832                    *state
833                        .devices_to_compat_sessions
834                        .entry((mas_user_id, CompactString::new(&device_id)))
835                        .or_insert_with(|| {
836                            Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng))
837                        })
838                } else {
839                    // If this is a deviceless access token, create a deviceless compat session
840                    // for it (since otherwise we won't create one whilst migrating devices)
841                    let deviceless_session_id =
842                        Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
843
844                    deviceless_session_write_buffer
845                        .write(
846                            &mut mas,
847                            MasNewCompatSession {
848                                session_id: deviceless_session_id,
849                                user_id: mas_user_id,
850                                device_id: None,
851                                human_name: None,
852                                created_at,
853                                is_synapse_admin: false,
854                                last_active_at: None,
855                                last_active_ip: None,
856                                user_agent: None,
857                            },
858                        )
859                        .await
860                        .into_mas("failed to write deviceless compat sessions")?;
861
862                    deviceless_session_id
863                };
864
865                let token_id =
866                    Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
867
868                write_buffer
869                    .write(
870                        &mut mas,
871                        MasNewCompatAccessToken {
872                            token_id,
873                            session_id,
874                            access_token: token,
875                            created_at,
876                            expires_at: valid_until_ms.map(DateTime::from),
877                        },
878                    )
879                    .await
880                    .into_mas("writing compat access tokens")?;
881
882                migrated_otel_counter.add(1, &otel_kv);
883                progress_counter.increment_migrated();
884            }
885            write_buffer
886                .finish(&mut mas)
887                .await
888                .into_mas("writing compat access tokens")?;
889            deviceless_session_write_buffer
890                .finish(&mut mas)
891                .await
892                .into_mas("writing deviceless compat sessions")?;
893
894            Ok((mas, state))
895        }
896        .instrument(tracing::info_span!("ingest_task")),
897    );
898
899    // In case this has an error, we still want to join the task, so we look at the
900    // error later
901    let res = synapse
902        .read_unrefreshable_access_tokens()
903        .map_err(|e| e.into_synapse("reading tokens"))
904        .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
905        .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
906        .await;
907
908    let (mas, state) = task.await.into_join("token write task")??;
909
910    res?;
911
912    info!(
913        "non-refreshable access tokens migrated in {:.1}s",
914        Instant::now().duration_since(start).as_secs_f64()
915    );
916
917    Ok((mas, state))
918}
919
920/// Migrates (access token, refresh token) pairs.
921/// Does not migrate non-refreshable access tokens.
922#[tracing::instrument(skip_all, level = Level::INFO)]
923#[allow(clippy::too_many_arguments)]
924async fn migrate_refreshable_token_pairs(
925    synapse: &mut SynapseReader<'_>,
926    mut mas: MasWriter,
927    clock: &dyn Clock,
928    rng: &mut impl RngCore,
929    mut state: MigrationState,
930    progress_counter: ProgressCounter,
931    migrated_otel_counter: Counter<u64>,
932    skipped_otel_counter: Counter<u64>,
933) -> Result<(MasWriter, MigrationState), Error> {
934    let start = Instant::now();
935    let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)];
936
937    let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
938    let mut access_token_write_buffer =
939        MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
940    let mut refresh_token_write_buffer =
941        MasWriteBuffer::new(&mas, MasWriter::write_compat_refresh_tokens);
942
943    while let Some(token_res) = token_stream.next().await {
944        let SynapseRefreshableTokenPair {
945            user_id: synapse_user_id,
946            device_id,
947            access_token,
948            refresh_token,
949            valid_until_ms,
950            last_validated,
951        } = token_res.into_synapse("reading Synapse refresh token")?;
952
953        let username = synapse_user_id
954            .extract_localpart(&state.server_name)
955            .into_extract_localpart(synapse_user_id.clone())?
956            .to_owned();
957        let Some(user_infos) = state.users.get(username.as_str()).copied() else {
958            return Err(Error::MissingUserFromDependentTable {
959                table: "refresh_tokens".to_owned(),
960                user: synapse_user_id,
961            });
962        };
963
964        let Some(mas_user_id) = user_infos.mas_user_id else {
965            progress_counter.increment_skipped();
966            skipped_otel_counter.add(1, &otel_kv);
967            continue;
968        };
969
970        if user_infos.flags.is_deactivated()
971            || user_infos.flags.is_guest()
972            || user_infos.flags.is_appservice()
973        {
974            progress_counter.increment_skipped();
975            skipped_otel_counter.add(1, &otel_kv);
976            continue;
977        }
978
979        // It's not always accurate, but last_validated is *often* the creation time of
980        // the device If we don't have one, then use the current time as a
981        // fallback.
982        let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from);
983
984        // Use the existing device_id if this is the second token for a device
985        let session_id = *state
986            .devices_to_compat_sessions
987            .entry((mas_user_id, CompactString::new(&device_id)))
988            .or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)));
989
990        let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
991        let refresh_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
992
993        access_token_write_buffer
994            .write(
995                &mut mas,
996                MasNewCompatAccessToken {
997                    token_id: access_token_id,
998                    session_id,
999                    access_token,
1000                    created_at,
1001                    expires_at: valid_until_ms.map(DateTime::from),
1002                },
1003            )
1004            .await
1005            .into_mas("writing compat access tokens")?;
1006        refresh_token_write_buffer
1007            .write(
1008                &mut mas,
1009                MasNewCompatRefreshToken {
1010                    refresh_token_id,
1011                    session_id,
1012                    access_token_id,
1013                    refresh_token,
1014                    created_at,
1015                },
1016            )
1017            .await
1018            .into_mas("writing compat refresh tokens")?;
1019
1020        migrated_otel_counter.add(1, &otel_kv);
1021        progress_counter.increment_migrated();
1022    }
1023
1024    access_token_write_buffer
1025        .finish(&mut mas)
1026        .await
1027        .into_mas("writing compat access tokens")?;
1028
1029    refresh_token_write_buffer
1030        .finish(&mut mas)
1031        .await
1032        .into_mas("writing compat refresh tokens")?;
1033
1034    info!(
1035        "refreshable token pairs migrated in {:.1}s",
1036        Instant::now().duration_since(start).as_secs_f64()
1037    );
1038
1039    Ok((mas, state))
1040}
1041
1042fn transform_user(
1043    user: &SynapseUser,
1044    server_name: &str,
1045    rng: &mut impl RngCore,
1046) -> Result<(MasNewUser, Option<MasNewUserPassword>), Error> {
1047    let username = user
1048        .name
1049        .extract_localpart(server_name)
1050        .into_extract_localpart(user.name.clone())?
1051        .to_owned();
1052
1053    let user_id = Uuid::from(Ulid::from_datetime_with_source(
1054        DateTime::<Utc>::from(user.creation_ts).into(),
1055        rng,
1056    ))
1057    .try_into()
1058    .expect("ULID generation lead to a nil UUID, this is a bug!");
1059
1060    let new_user = MasNewUser {
1061        user_id,
1062        username,
1063        created_at: user.creation_ts.into(),
1064        locked_at: user.locked.then_some(user.creation_ts.into()),
1065        deactivated_at: bool::from(user.deactivated).then_some(user.creation_ts.into()),
1066        can_request_admin: bool::from(user.admin),
1067        is_guest: bool::from(user.is_guest),
1068    };
1069
1070    let mas_password = user
1071        .password_hash
1072        .clone()
1073        .map(|password_hash| MasNewUserPassword {
1074            user_password_id: Uuid::from(Ulid::from_datetime_with_source(
1075                DateTime::<Utc>::from(user.creation_ts).into(),
1076                rng,
1077            )),
1078            user_id: new_user.user_id,
1079            hashed_password: password_hash,
1080            created_at: new_user.created_at,
1081        });
1082
1083    Ok((new_user, mas_password))
1084}