syn2mas/mas_writer/
constraint_pausing.rs1use std::time::Instant;
7
8use sqlx::PgConnection;
9use tracing::{debug, info};
10
11use super::{Error, IntoDatabase};
12
13pub struct ConstraintDescription {
15 pub name: String,
16 pub table_name: String,
17 pub definition: String,
18}
19
20pub struct IndexDescription {
21 pub name: String,
22 pub table_name: String,
23 pub definition: String,
24}
25
26pub async fn describe_constraints_on_table(
28 conn: &mut PgConnection,
29 table_name: &str,
30) -> Result<Vec<ConstraintDescription>, Error> {
31 sqlx::query_as!(
32 ConstraintDescription,
33 r#"
34 SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
35 FROM pg_constraint c
36 JOIN pg_namespace n ON n.oid = c.connamespace
37 WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1
38 AND n.nspname = current_schema;
39 "#,
40 table_name
41 ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read constraint definitions of {table_name}"))
42}
43
44pub async fn describe_foreign_key_constraints_to_table(
47 conn: &mut PgConnection,
48 target_table_name: &str,
49) -> Result<Vec<ConstraintDescription>, Error> {
50 sqlx::query_as!(
51 ConstraintDescription,
52 r#"
53 SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
54 FROM pg_constraint c
55 JOIN pg_namespace n ON n.oid = c.connamespace
56 WHERE contype = 'f' AND confrelid::regclass::text = $1
57 AND n.nspname = current_schema;
58 "#,
59 target_table_name
60 ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read FK constraint definitions targetting {target_table_name}"))
61}
62
63pub async fn describe_indices_on_table(
65 conn: &mut PgConnection,
66 table_name: &str,
67) -> Result<Vec<IndexDescription>, Error> {
68 sqlx::query_as!(
69 IndexDescription,
70 r#"
71 SELECT indexname AS "name!", indexdef AS "definition!", schemaname AS "table_name!"
72 FROM pg_indexes
73 WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL
74 "#,
75 table_name
76 ).fetch_all(&mut *conn).await.into_database("cannot search for indices")
77}
78
79pub async fn drop_constraint(
83 conn: &mut PgConnection,
84 constraint: &ConstraintDescription,
85) -> Result<(), Error> {
86 let name = &constraint.name;
87 let table_name = &constraint.table_name;
88 debug!("dropping constraint {name} on table {table_name}");
89 sqlx::query(&format!("ALTER TABLE {table_name} DROP CONSTRAINT {name};"))
90 .execute(&mut *conn)
91 .await
92 .into_database_with(|| format!("failed to drop constraint {name} on {table_name}"))?;
93
94 Ok(())
95}
96
97pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
101 let index_name = &index.name;
102 debug!("dropping index {index_name}");
103 sqlx::query(&format!("DROP INDEX {index_name};"))
104 .execute(&mut *conn)
105 .await
106 .into_database_with(|| format!("failed to temporarily drop {index_name}"))?;
107
108 Ok(())
109}
110
111#[tracing::instrument(name = "syn2mas.restore_constraint", skip_all, fields(constraint.name = constraint.name))]
115pub async fn restore_constraint(
116 conn: &mut PgConnection,
117 constraint: &ConstraintDescription,
118) -> Result<(), Error> {
119 let start = Instant::now();
120
121 let ConstraintDescription {
122 name,
123 table_name,
124 definition,
125 } = &constraint;
126 info!("rebuilding constraint {name}");
127
128 sqlx::query(&format!(
129 "ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};"
130 ))
131 .execute(conn)
132 .await
133 .into_database_with(|| {
134 format!("failed to recreate constraint {name} on {table_name} with {definition}")
135 })?;
136
137 info!(
138 "constraint {name} rebuilt in {:.1}s",
139 Instant::now().duration_since(start).as_secs_f64()
140 );
141
142 Ok(())
143}
144
145#[tracing::instrument(name = "syn2mas.restore_index", skip_all, fields(index.name = index.name))]
149pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
150 let start = Instant::now();
151
152 let IndexDescription {
153 name,
154 table_name,
155 definition,
156 } = &index;
157
158 sqlx::query(&format!("{definition};"))
159 .execute(conn)
160 .await
161 .into_database_with(|| {
162 format!("failed to recreate index {name} on {table_name} with {definition}")
163 })?;
164
165 info!(
166 "index {name} rebuilt in {:.1}s",
167 Instant::now().duration_since(start).as_secs_f64()
168 );
169
170 Ok(())
171}