1use std::{pin::pin, time::Instant};
15
16use chrono::{DateTime, Utc};
17use compact_str::CompactString;
18use futures_util::{SinkExt, StreamExt as _, TryFutureExt, TryStreamExt as _};
19use mas_storage::Clock;
20use opentelemetry::{KeyValue, metrics::Counter};
21use rand::{RngCore, SeedableRng};
22use thiserror::Error;
23use thiserror_ext::ContextInto;
24use tokio_util::sync::PollSender;
25use tracing::{Instrument as _, Level, info};
26use ulid::Ulid;
27use uuid::{NonNilUuid, Uuid};
28
29use crate::{
30 HashMap, ProgressCounter, RandomState, SynapseReader,
31 mas_writer::{
32 self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
33 MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
34 MasNewUserPassword, MasWriteBuffer, MasWriter,
35 },
36 progress::Progress,
37 synapse_reader::{
38 self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
39 SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
40 },
41 telemetry::{
42 K_ENTITY, METER, V_ENTITY_DEVICES, V_ENTITY_EXTERNAL_IDS,
43 V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, V_ENTITY_REFRESHABLE_TOKEN_PAIRS,
44 V_ENTITY_THREEPIDS, V_ENTITY_USERS,
45 },
46};
47
48#[derive(Debug, Error, ContextInto)]
49pub enum Error {
50 #[error("error when reading synapse DB ({context}): {source}")]
51 Synapse {
52 source: synapse_reader::Error,
53 context: String,
54 },
55 #[error("error when writing to MAS DB ({context}): {source}")]
56 Mas {
57 source: mas_writer::Error,
58 context: String,
59 },
60 #[error("failed to extract localpart of {user:?}: {source}")]
61 ExtractLocalpart {
62 source: ExtractLocalpartError,
63 user: FullUserId,
64 },
65 #[error("channel closed")]
66 ChannelClosed,
67
68 #[error("task failed ({context}): {source}")]
69 Join {
70 source: tokio::task::JoinError,
71 context: String,
72 },
73
74 #[error("user {user} was not found for migration but a row in {table} was found for them")]
75 MissingUserFromDependentTable { table: String, user: FullUserId },
76 #[error(
77 "missing a mapping for the auth provider with ID {synapse_id:?} (used by {user} and maybe other users)"
78 )]
79 MissingAuthProviderMapping {
80 synapse_id: String,
83 user: FullUserId,
85 },
86}
87
88bitflags::bitflags! {
89 #[derive(Debug, Clone, Copy)]
90 struct UserFlags: u8 {
91 const IS_SYNAPSE_ADMIN = 0b0000_0001;
92 const IS_DEACTIVATED = 0b0000_0010;
93 const IS_GUEST = 0b0000_0100;
94 const IS_APPSERVICE = 0b0000_1000;
95 }
96}
97
98impl UserFlags {
99 const fn is_deactivated(self) -> bool {
100 self.contains(UserFlags::IS_DEACTIVATED)
101 }
102
103 const fn is_guest(self) -> bool {
104 self.contains(UserFlags::IS_GUEST)
105 }
106
107 const fn is_synapse_admin(self) -> bool {
108 self.contains(UserFlags::IS_SYNAPSE_ADMIN)
109 }
110
111 const fn is_appservice(self) -> bool {
112 self.contains(UserFlags::IS_APPSERVICE)
113 }
114}
115
116#[derive(Debug, Clone, Copy)]
117struct UserInfo {
118 mas_user_id: Option<NonNilUuid>,
119 flags: UserFlags,
120}
121
122struct MigrationState {
123 server_name: String,
125
126 users: HashMap<CompactString, UserInfo>,
128
129 devices_to_compat_sessions: HashMap<(NonNilUuid, CompactString), Uuid>,
131
132 provider_id_mapping: std::collections::HashMap<String, Uuid>,
135}
136
137#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
150pub async fn migrate(
151 mut synapse: SynapseReader<'_>,
152 mas: MasWriter,
153 server_name: String,
154 clock: &dyn Clock,
155 rng: &mut impl RngCore,
156 provider_id_mapping: std::collections::HashMap<String, Uuid>,
157 progress: &Progress,
158) -> Result<(), Error> {
159 let counts = synapse.count_rows().await.into_synapse("counting users")?;
160
161 let approx_total_counter = METER
162 .u64_counter("syn2mas.entity.approx_total")
163 .with_description("Approximate number of entities of this type to be migrated")
164 .build();
165 let migrated_otel_counter = METER
166 .u64_counter("syn2mas.entity.migrated")
167 .with_description("Number of entities of this type that have been migrated so far")
168 .build();
169 let skipped_otel_counter = METER
170 .u64_counter("syn2mas.entity.skipped")
171 .with_description("Number of entities of this type that have been skipped so far")
172 .build();
173
174 approx_total_counter.add(
175 counts.users as u64,
176 &[KeyValue::new(K_ENTITY, V_ENTITY_USERS)],
177 );
178 approx_total_counter.add(
179 counts.devices as u64,
180 &[KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)],
181 );
182 approx_total_counter.add(
183 counts.threepids as u64,
184 &[KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)],
185 );
186 approx_total_counter.add(
187 counts.external_ids as u64,
188 &[KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)],
189 );
190 let approx_nonrefreshable_access_tokens = counts.access_tokens - counts.refresh_tokens;
192 approx_total_counter.add(
193 approx_nonrefreshable_access_tokens as u64,
194 &[KeyValue::new(
195 K_ENTITY,
196 V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
197 )],
198 );
199 approx_total_counter.add(
200 counts.refresh_tokens as u64,
201 &[KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)],
202 );
203
204 let state = MigrationState {
205 server_name,
206 users: HashMap::with_capacity_and_hasher(counts.users * 9 / 8, RandomState::default()),
209 devices_to_compat_sessions: HashMap::with_capacity_and_hasher(
210 counts.devices * 9 / 8,
211 RandomState::default(),
212 ),
213 provider_id_mapping,
214 };
215
216 let progress_counter = progress.migrating_data(V_ENTITY_USERS, counts.users);
217 let (mas, state) = migrate_users(
218 &mut synapse,
219 mas,
220 state,
221 rng,
222 progress_counter,
223 migrated_otel_counter.clone(),
224 skipped_otel_counter.clone(),
225 )
226 .await?;
227
228 let progress_counter = progress.migrating_data(V_ENTITY_THREEPIDS, counts.threepids);
229 let (mas, state) = migrate_threepids(
230 &mut synapse,
231 mas,
232 rng,
233 state,
234 progress_counter,
235 migrated_otel_counter.clone(),
236 skipped_otel_counter.clone(),
237 )
238 .await?;
239
240 let progress_counter = progress.migrating_data(V_ENTITY_EXTERNAL_IDS, counts.external_ids);
241 let (mas, state) = migrate_external_ids(
242 &mut synapse,
243 mas,
244 rng,
245 state,
246 progress_counter,
247 migrated_otel_counter.clone(),
248 skipped_otel_counter.clone(),
249 )
250 .await?;
251
252 let progress_counter = progress.migrating_data(
253 V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
254 counts.access_tokens - counts.refresh_tokens,
255 );
256 let (mas, state) = migrate_unrefreshable_access_tokens(
257 &mut synapse,
258 mas,
259 clock,
260 rng,
261 state,
262 progress_counter,
263 migrated_otel_counter.clone(),
264 skipped_otel_counter.clone(),
265 )
266 .await?;
267
268 let progress_counter =
269 progress.migrating_data(V_ENTITY_REFRESHABLE_TOKEN_PAIRS, counts.refresh_tokens);
270 let (mas, state) = migrate_refreshable_token_pairs(
271 &mut synapse,
272 mas,
273 clock,
274 rng,
275 state,
276 progress_counter,
277 migrated_otel_counter.clone(),
278 skipped_otel_counter.clone(),
279 )
280 .await?;
281
282 let progress_counter = progress.migrating_data("devices", counts.devices);
283 let (mas, _state) = migrate_devices(
284 &mut synapse,
285 mas,
286 rng,
287 state,
288 progress_counter,
289 migrated_otel_counter.clone(),
290 skipped_otel_counter.clone(),
291 )
292 .await?;
293
294 synapse
295 .finish()
296 .await
297 .into_synapse("failed to close Synapse reader")?;
298
299 mas.finish(progress)
300 .await
301 .into_mas("failed to finalise MAS database")?;
302
303 Ok(())
304}
305
306#[tracing::instrument(skip_all, level = Level::INFO)]
307async fn migrate_users(
308 synapse: &mut SynapseReader<'_>,
309 mut mas: MasWriter,
310 mut state: MigrationState,
311 rng: &mut impl RngCore,
312 progress_counter: ProgressCounter,
313 migrated_otel_counter: Counter<u64>,
314 skipped_otel_counter: Counter<u64>,
315) -> Result<(MasWriter, MigrationState), Error> {
316 let start = Instant::now();
317 let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)];
318
319 let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseUser>(10 * 1024 * 1024);
320
321 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
324 let task = tokio::spawn(
325 async move {
326 let mut user_buffer = MasWriteBuffer::new(&mas, MasWriter::write_users);
327 let mut password_buffer = MasWriteBuffer::new(&mas, MasWriter::write_passwords);
328
329 while let Some(user) = rx.recv().await {
330 if user.appservice_id.is_some()
333 && user
334 .name
335 .0
336 .strip_suffix(&format!(":{}", state.server_name))
337 .is_some_and(|localpart| localpart.contains(':'))
338 {
339 tracing::warn!("AS user {} has invalid localpart, ignoring!", user.name.0);
340 continue;
341 }
342
343 let (mas_user, mas_password_opt) =
344 transform_user(&user, &state.server_name, &mut rng)?;
345
346 let mut flags = UserFlags::empty();
347 if bool::from(user.admin) {
348 flags |= UserFlags::IS_SYNAPSE_ADMIN;
349 }
350 if bool::from(user.deactivated) {
351 flags |= UserFlags::IS_DEACTIVATED;
352 }
353 if bool::from(user.is_guest) {
354 flags |= UserFlags::IS_GUEST;
355 }
356 if user.appservice_id.is_some() {
357 flags |= UserFlags::IS_APPSERVICE;
358
359 skipped_otel_counter.add(1, &otel_kv);
360 progress_counter.increment_skipped();
361
362 state.users.insert(
365 CompactString::new(&mas_user.username),
366 UserInfo {
367 mas_user_id: None,
368 flags,
369 },
370 );
371 continue;
372 }
373
374 state.users.insert(
375 CompactString::new(&mas_user.username),
376 UserInfo {
377 mas_user_id: Some(mas_user.user_id),
378 flags,
379 },
380 );
381
382 user_buffer
383 .write(&mut mas, mas_user)
384 .await
385 .into_mas("writing user")?;
386
387 if let Some(mas_password) = mas_password_opt {
388 password_buffer
389 .write(&mut mas, mas_password)
390 .await
391 .into_mas("writing password")?;
392 }
393
394 migrated_otel_counter.add(1, &otel_kv);
395 progress_counter.increment_migrated();
396 }
397
398 user_buffer
399 .finish(&mut mas)
400 .await
401 .into_mas("writing users")?;
402 password_buffer
403 .finish(&mut mas)
404 .await
405 .into_mas("writing passwords")?;
406
407 Ok((mas, state))
408 }
409 .instrument(tracing::info_span!("ingest_task")),
410 );
411
412 let res = synapse
415 .read_users()
416 .map_err(|e| e.into_synapse("reading users"))
417 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
418 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
419 .await;
420
421 let (mas, state) = task.await.into_join("user write task")??;
422
423 res?;
424
425 info!(
426 "users migrated in {:.1}s",
427 Instant::now().duration_since(start).as_secs_f64()
428 );
429
430 Ok((mas, state))
431}
432
433#[tracing::instrument(skip_all, level = Level::INFO)]
434async fn migrate_threepids(
435 synapse: &mut SynapseReader<'_>,
436 mut mas: MasWriter,
437 rng: &mut impl RngCore,
438 state: MigrationState,
439 progress_counter: ProgressCounter,
440 migrated_otel_counter: Counter<u64>,
441 skipped_otel_counter: Counter<u64>,
442) -> Result<(MasWriter, MigrationState), Error> {
443 let start = Instant::now();
444 let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)];
445
446 let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
447 let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
448 let mut users_stream = pin!(synapse.read_threepids());
449
450 while let Some(threepid_res) = users_stream.next().await {
451 let SynapseThreepid {
452 user_id: synapse_user_id,
453 medium,
454 address,
455 added_at,
456 } = threepid_res.into_synapse("reading threepid")?;
457 let created_at: DateTime<Utc> = added_at.into();
458
459 let username = synapse_user_id
460 .extract_localpart(&state.server_name)
461 .into_extract_localpart(synapse_user_id.clone())?
462 .to_owned();
463 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
464 return Err(Error::MissingUserFromDependentTable {
465 table: "user_threepids".to_owned(),
466 user: synapse_user_id,
467 });
468 };
469
470 let Some(mas_user_id) = user_infos.mas_user_id else {
471 progress_counter.increment_skipped();
472 skipped_otel_counter.add(1, &otel_kv);
473 continue;
474 };
475
476 if medium == "email" {
477 email_buffer
478 .write(
479 &mut mas,
480 MasNewEmailThreepid {
481 user_id: mas_user_id,
482 user_email_id: Uuid::from(Ulid::from_datetime_with_source(
483 created_at.into(),
484 rng,
485 )),
486 email: address,
487 created_at,
488 },
489 )
490 .await
491 .into_mas("writing email")?;
492 } else {
493 unsupported_buffer
494 .write(
495 &mut mas,
496 MasNewUnsupportedThreepid {
497 user_id: mas_user_id,
498 medium,
499 address,
500 created_at,
501 },
502 )
503 .await
504 .into_mas("writing unsupported threepid")?;
505 }
506
507 migrated_otel_counter.add(1, &otel_kv);
508 progress_counter.increment_migrated();
509 }
510
511 email_buffer
512 .finish(&mut mas)
513 .await
514 .into_mas("writing email threepids")?;
515 unsupported_buffer
516 .finish(&mut mas)
517 .await
518 .into_mas("writing unsupported threepids")?;
519
520 info!(
521 "third-party IDs migrated in {:.1}s",
522 Instant::now().duration_since(start).as_secs_f64()
523 );
524
525 Ok((mas, state))
526}
527
528#[tracing::instrument(skip_all, level = Level::INFO)]
533async fn migrate_external_ids(
534 synapse: &mut SynapseReader<'_>,
535 mut mas: MasWriter,
536 rng: &mut impl RngCore,
537 state: MigrationState,
538 progress_counter: ProgressCounter,
539 migrated_otel_counter: Counter<u64>,
540 skipped_otel_counter: Counter<u64>,
541) -> Result<(MasWriter, MigrationState), Error> {
542 let start = Instant::now();
543 let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)];
544
545 let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
546 let mut extids_stream = pin!(synapse.read_user_external_ids());
547
548 while let Some(extid_res) = extids_stream.next().await {
549 let SynapseExternalId {
550 user_id: synapse_user_id,
551 auth_provider,
552 external_id: subject,
553 } = extid_res.into_synapse("reading external ID")?;
554 let username = synapse_user_id
555 .extract_localpart(&state.server_name)
556 .into_extract_localpart(synapse_user_id.clone())?
557 .to_owned();
558 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
559 return Err(Error::MissingUserFromDependentTable {
560 table: "user_external_ids".to_owned(),
561 user: synapse_user_id,
562 });
563 };
564
565 let Some(mas_user_id) = user_infos.mas_user_id else {
566 progress_counter.increment_skipped();
567 skipped_otel_counter.add(1, &otel_kv);
568 continue;
569 };
570
571 let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) else {
572 return Err(Error::MissingAuthProviderMapping {
573 synapse_id: auth_provider,
574 user: synapse_user_id,
575 });
576 };
577
578 let user_created_ts = Ulid::from(mas_user_id.get()).datetime();
581
582 let link_id: Uuid = Ulid::from_datetime_with_source(user_created_ts, rng).into();
583
584 write_buffer
585 .write(
586 &mut mas,
587 MasNewUpstreamOauthLink {
588 link_id,
589 user_id: mas_user_id,
590 upstream_provider_id,
591 subject,
592 created_at: user_created_ts.into(),
593 },
594 )
595 .await
596 .into_mas("failed to write upstream link")?;
597
598 migrated_otel_counter.add(1, &otel_kv);
599 progress_counter.increment_migrated();
600 }
601
602 write_buffer
603 .finish(&mut mas)
604 .await
605 .into_mas("writing upstream links")?;
606
607 info!(
608 "upstream links (external IDs) migrated in {:.1}s",
609 Instant::now().duration_since(start).as_secs_f64()
610 );
611
612 Ok((mas, state))
613}
614
615#[tracing::instrument(skip_all, level = Level::INFO)]
624async fn migrate_devices(
625 synapse: &mut SynapseReader<'_>,
626 mut mas: MasWriter,
627 rng: &mut impl RngCore,
628 mut state: MigrationState,
629 progress_counter: ProgressCounter,
630 migrated_otel_counter: Counter<u64>,
631 skipped_otel_counter: Counter<u64>,
632) -> Result<(MasWriter, MigrationState), Error> {
633 let start = Instant::now();
634 let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)];
635
636 let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024);
637
638 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
641 let task = tokio::spawn(
642 async move {
643 let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
644
645 while let Some(device) = rx.recv().await {
646 let SynapseDevice {
647 user_id: synapse_user_id,
648 device_id,
649 display_name,
650 last_seen,
651 ip,
652 user_agent,
653 } = device;
654 let username = synapse_user_id
655 .extract_localpart(&state.server_name)
656 .into_extract_localpart(synapse_user_id.clone())?
657 .to_owned();
658 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
659 return Err(Error::MissingUserFromDependentTable {
660 table: "devices".to_owned(),
661 user: synapse_user_id,
662 });
663 };
664
665 let Some(mas_user_id) = user_infos.mas_user_id else {
666 progress_counter.increment_skipped();
667 skipped_otel_counter.add(1, &otel_kv);
668 continue;
669 };
670
671 if user_infos.flags.is_deactivated()
672 || user_infos.flags.is_guest()
673 || user_infos.flags.is_appservice()
674 {
675 continue;
676 }
677
678 let session_id = *state
679 .devices_to_compat_sessions
680 .entry((mas_user_id, CompactString::new(&device_id)))
681 .or_insert_with(||
682 Ulid::with_source(&mut rng).into());
685 let created_at = Ulid::from(session_id).datetime().into();
686
687 let last_active_ip = ip.filter(|ip| ip != "-").and_then(|ip| {
693 ip.parse()
694 .map_err(|e| {
695 tracing::warn!(
696 error = &e as &dyn std::error::Error,
697 mxid = %synapse_user_id,
698 %device_id,
699 %ip,
700 "Failed to parse device IP, ignoring"
701 );
702 })
703 .ok()
704 });
705
706 write_buffer
707 .write(
708 &mut mas,
709 MasNewCompatSession {
710 session_id,
711 user_id: mas_user_id,
712 device_id: Some(device_id),
713 human_name: display_name,
714 created_at,
715 is_synapse_admin: user_infos.flags.is_synapse_admin(),
716 last_active_at: last_seen.map(DateTime::from),
717 last_active_ip,
718 user_agent,
719 },
720 )
721 .await
722 .into_mas("writing compat sessions")?;
723
724 migrated_otel_counter.add(1, &otel_kv);
725 progress_counter.increment_migrated();
726 }
727
728 write_buffer
729 .finish(&mut mas)
730 .await
731 .into_mas("writing compat sessions")?;
732
733 Ok((mas, state))
734 }
735 .instrument(tracing::info_span!("ingest_task")),
736 );
737
738 let res = synapse
741 .read_devices()
742 .map_err(|e| e.into_synapse("reading devices"))
743 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
744 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
745 .await;
746
747 let (mas, state) = task.await.into_join("device write task")??;
748
749 res?;
750
751 info!(
752 "devices migrated in {:.1}s",
753 Instant::now().duration_since(start).as_secs_f64()
754 );
755
756 Ok((mas, state))
757}
758
759#[tracing::instrument(skip_all, level = Level::INFO)]
762#[allow(clippy::too_many_arguments)]
763async fn migrate_unrefreshable_access_tokens(
764 synapse: &mut SynapseReader<'_>,
765 mut mas: MasWriter,
766 clock: &dyn Clock,
767 rng: &mut impl RngCore,
768 mut state: MigrationState,
769 progress_counter: ProgressCounter,
770 migrated_otel_counter: Counter<u64>,
771 skipped_otel_counter: Counter<u64>,
772) -> Result<(MasWriter, MigrationState), Error> {
773 let start = Instant::now();
774 let otel_kv = [KeyValue::new(
775 K_ENTITY,
776 V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
777 )];
778
779 let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024);
780
781 let now = clock.now();
782 let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
785 let task = tokio::spawn(
786 async move {
787 let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
788 let mut deviceless_session_write_buffer =
789 MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
790
791 while let Some(token) = rx.recv().await {
792 let SynapseAccessToken {
793 user_id: synapse_user_id,
794 device_id,
795 token,
796 valid_until_ms,
797 last_validated,
798 } = token;
799 let username = synapse_user_id
800 .extract_localpart(&state.server_name)
801 .into_extract_localpart(synapse_user_id.clone())?
802 .to_owned();
803 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
804 return Err(Error::MissingUserFromDependentTable {
805 table: "access_tokens".to_owned(),
806 user: synapse_user_id,
807 });
808 };
809
810 let Some(mas_user_id) = user_infos.mas_user_id else {
811 progress_counter.increment_skipped();
812 skipped_otel_counter.add(1, &otel_kv);
813 continue;
814 };
815
816 if user_infos.flags.is_deactivated()
817 || user_infos.flags.is_guest()
818 || user_infos.flags.is_appservice()
819 {
820 progress_counter.increment_skipped();
821 skipped_otel_counter.add(1, &otel_kv);
822 continue;
823 }
824
825 let created_at = last_validated.map_or_else(|| now, DateTime::from);
829
830 let session_id = if let Some(device_id) = device_id {
831 *state
833 .devices_to_compat_sessions
834 .entry((mas_user_id, CompactString::new(&device_id)))
835 .or_insert_with(|| {
836 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng))
837 })
838 } else {
839 let deviceless_session_id =
842 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
843
844 deviceless_session_write_buffer
845 .write(
846 &mut mas,
847 MasNewCompatSession {
848 session_id: deviceless_session_id,
849 user_id: mas_user_id,
850 device_id: None,
851 human_name: None,
852 created_at,
853 is_synapse_admin: false,
854 last_active_at: None,
855 last_active_ip: None,
856 user_agent: None,
857 },
858 )
859 .await
860 .into_mas("failed to write deviceless compat sessions")?;
861
862 deviceless_session_id
863 };
864
865 let token_id =
866 Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng));
867
868 write_buffer
869 .write(
870 &mut mas,
871 MasNewCompatAccessToken {
872 token_id,
873 session_id,
874 access_token: token,
875 created_at,
876 expires_at: valid_until_ms.map(DateTime::from),
877 },
878 )
879 .await
880 .into_mas("writing compat access tokens")?;
881
882 migrated_otel_counter.add(1, &otel_kv);
883 progress_counter.increment_migrated();
884 }
885 write_buffer
886 .finish(&mut mas)
887 .await
888 .into_mas("writing compat access tokens")?;
889 deviceless_session_write_buffer
890 .finish(&mut mas)
891 .await
892 .into_mas("writing deviceless compat sessions")?;
893
894 Ok((mas, state))
895 }
896 .instrument(tracing::info_span!("ingest_task")),
897 );
898
899 let res = synapse
902 .read_unrefreshable_access_tokens()
903 .map_err(|e| e.into_synapse("reading tokens"))
904 .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
905 .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
906 .await;
907
908 let (mas, state) = task.await.into_join("token write task")??;
909
910 res?;
911
912 info!(
913 "non-refreshable access tokens migrated in {:.1}s",
914 Instant::now().duration_since(start).as_secs_f64()
915 );
916
917 Ok((mas, state))
918}
919
920#[tracing::instrument(skip_all, level = Level::INFO)]
923#[allow(clippy::too_many_arguments)]
924async fn migrate_refreshable_token_pairs(
925 synapse: &mut SynapseReader<'_>,
926 mut mas: MasWriter,
927 clock: &dyn Clock,
928 rng: &mut impl RngCore,
929 mut state: MigrationState,
930 progress_counter: ProgressCounter,
931 migrated_otel_counter: Counter<u64>,
932 skipped_otel_counter: Counter<u64>,
933) -> Result<(MasWriter, MigrationState), Error> {
934 let start = Instant::now();
935 let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)];
936
937 let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
938 let mut access_token_write_buffer =
939 MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
940 let mut refresh_token_write_buffer =
941 MasWriteBuffer::new(&mas, MasWriter::write_compat_refresh_tokens);
942
943 while let Some(token_res) = token_stream.next().await {
944 let SynapseRefreshableTokenPair {
945 user_id: synapse_user_id,
946 device_id,
947 access_token,
948 refresh_token,
949 valid_until_ms,
950 last_validated,
951 } = token_res.into_synapse("reading Synapse refresh token")?;
952
953 let username = synapse_user_id
954 .extract_localpart(&state.server_name)
955 .into_extract_localpart(synapse_user_id.clone())?
956 .to_owned();
957 let Some(user_infos) = state.users.get(username.as_str()).copied() else {
958 return Err(Error::MissingUserFromDependentTable {
959 table: "refresh_tokens".to_owned(),
960 user: synapse_user_id,
961 });
962 };
963
964 let Some(mas_user_id) = user_infos.mas_user_id else {
965 progress_counter.increment_skipped();
966 skipped_otel_counter.add(1, &otel_kv);
967 continue;
968 };
969
970 if user_infos.flags.is_deactivated()
971 || user_infos.flags.is_guest()
972 || user_infos.flags.is_appservice()
973 {
974 progress_counter.increment_skipped();
975 skipped_otel_counter.add(1, &otel_kv);
976 continue;
977 }
978
979 let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from);
983
984 let session_id = *state
986 .devices_to_compat_sessions
987 .entry((mas_user_id, CompactString::new(&device_id)))
988 .or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)));
989
990 let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
991 let refresh_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
992
993 access_token_write_buffer
994 .write(
995 &mut mas,
996 MasNewCompatAccessToken {
997 token_id: access_token_id,
998 session_id,
999 access_token,
1000 created_at,
1001 expires_at: valid_until_ms.map(DateTime::from),
1002 },
1003 )
1004 .await
1005 .into_mas("writing compat access tokens")?;
1006 refresh_token_write_buffer
1007 .write(
1008 &mut mas,
1009 MasNewCompatRefreshToken {
1010 refresh_token_id,
1011 session_id,
1012 access_token_id,
1013 refresh_token,
1014 created_at,
1015 },
1016 )
1017 .await
1018 .into_mas("writing compat refresh tokens")?;
1019
1020 migrated_otel_counter.add(1, &otel_kv);
1021 progress_counter.increment_migrated();
1022 }
1023
1024 access_token_write_buffer
1025 .finish(&mut mas)
1026 .await
1027 .into_mas("writing compat access tokens")?;
1028
1029 refresh_token_write_buffer
1030 .finish(&mut mas)
1031 .await
1032 .into_mas("writing compat refresh tokens")?;
1033
1034 info!(
1035 "refreshable token pairs migrated in {:.1}s",
1036 Instant::now().duration_since(start).as_secs_f64()
1037 );
1038
1039 Ok((mas, state))
1040}
1041
1042fn transform_user(
1043 user: &SynapseUser,
1044 server_name: &str,
1045 rng: &mut impl RngCore,
1046) -> Result<(MasNewUser, Option<MasNewUserPassword>), Error> {
1047 let username = user
1048 .name
1049 .extract_localpart(server_name)
1050 .into_extract_localpart(user.name.clone())?
1051 .to_owned();
1052
1053 let user_id = Uuid::from(Ulid::from_datetime_with_source(
1054 DateTime::<Utc>::from(user.creation_ts).into(),
1055 rng,
1056 ))
1057 .try_into()
1058 .expect("ULID generation lead to a nil UUID, this is a bug!");
1059
1060 let new_user = MasNewUser {
1061 user_id,
1062 username,
1063 created_at: user.creation_ts.into(),
1064 locked_at: user.locked.then_some(user.creation_ts.into()),
1065 deactivated_at: bool::from(user.deactivated).then_some(user.creation_ts.into()),
1066 can_request_admin: bool::from(user.admin),
1067 is_guest: bool::from(user.is_guest),
1068 };
1069
1070 let mas_password = user
1071 .password_hash
1072 .clone()
1073 .map(|password_hash| MasNewUserPassword {
1074 user_password_id: Uuid::from(Ulid::from_datetime_with_source(
1075 DateTime::<Utc>::from(user.creation_ts).into(),
1076 rng,
1077 )),
1078 user_id: new_user.user_id,
1079 hashed_password: password_hash,
1080 created_at: new_user.created_at,
1081 });
1082
1083 Ok((new_user, mas_password))
1084}