syn2mas/
progress.rs

1use std::sync::{Arc, atomic::AtomicU32};
2
3use arc_swap::ArcSwap;
4
5/// Tracker for the progress of the migration
6///
7/// Cloning this struct intuitively gives a 'handle' to the same counters,
8/// which means it can be shared between tasks/threads.
9#[derive(Clone)]
10pub struct Progress {
11    current_stage: Arc<ArcSwap<ProgressStage>>,
12}
13
14#[derive(Clone, Default)]
15pub struct ProgressCounter {
16    inner: Arc<ProgressCounterInner>,
17}
18
19#[derive(Default)]
20struct ProgressCounterInner {
21    migrated: AtomicU32,
22    skipped: AtomicU32,
23}
24
25impl ProgressCounter {
26    pub fn increment_migrated(&self) {
27        self.inner
28            .migrated
29            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
30    }
31
32    pub fn increment_skipped(&self) {
33        self.inner
34            .skipped
35            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
36    }
37
38    #[must_use]
39    pub fn migrated(&self) -> u32 {
40        self.inner
41            .migrated
42            .load(std::sync::atomic::Ordering::Relaxed)
43    }
44
45    #[must_use]
46    pub fn skipped(&self) -> u32 {
47        self.inner
48            .skipped
49            .load(std::sync::atomic::Ordering::Relaxed)
50    }
51}
52
53impl Progress {
54    #[must_use]
55    pub fn migrating_data(&self, entity: &'static str, approx_count: usize) -> ProgressCounter {
56        let counter = ProgressCounter::default();
57        self.set_current_stage(ProgressStage::MigratingData {
58            entity,
59            counter: counter.clone(),
60            approx_count: approx_count as u64,
61        });
62        counter
63    }
64
65    pub fn rebuild_index(&self, index_name: String) {
66        self.set_current_stage(ProgressStage::RebuildIndex { index_name });
67    }
68
69    pub fn rebuild_constraint(&self, constraint_name: String) {
70        self.set_current_stage(ProgressStage::RebuildConstraint { constraint_name });
71    }
72
73    /// Sets the current stage of progress.
74    ///
75    /// This is probably not cheap enough to use for every individual row,
76    /// so use of atomic integers for the fields that will be updated is
77    /// recommended.
78    #[inline]
79    fn set_current_stage(&self, stage: ProgressStage) {
80        self.current_stage.store(Arc::new(stage));
81    }
82
83    /// Returns the current stage of progress.
84    #[inline]
85    #[must_use]
86    pub fn get_current_stage(&self) -> arc_swap::Guard<Arc<ProgressStage>> {
87        self.current_stage.load()
88    }
89}
90
91impl Default for Progress {
92    fn default() -> Self {
93        Self {
94            current_stage: Arc::new(ArcSwap::new(Arc::new(ProgressStage::SettingUp))),
95        }
96    }
97}
98
99pub enum ProgressStage {
100    SettingUp,
101    MigratingData {
102        entity: &'static str,
103        counter: ProgressCounter,
104        approx_count: u64,
105    },
106    RebuildIndex {
107        index_name: String,
108    },
109    RebuildConstraint {
110        constraint_name: String,
111    },
112}