syn2mas/mas_writer/
constraint_pausing.rs

1// Copyright 2024 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6use std::time::Instant;
7
8use sqlx::PgConnection;
9use tracing::{debug, info};
10
11use super::{Error, IntoDatabase};
12
13/// Description of a constraint, which allows recreating it later.
14pub struct ConstraintDescription {
15    pub name: String,
16    pub table_name: String,
17    pub definition: String,
18}
19
20pub struct IndexDescription {
21    pub name: String,
22    pub table_name: String,
23    pub definition: String,
24}
25
26/// Look up and return the definition of a constraint.
27pub async fn describe_constraints_on_table(
28    conn: &mut PgConnection,
29    table_name: &str,
30) -> Result<Vec<ConstraintDescription>, Error> {
31    sqlx::query_as!(
32        ConstraintDescription,
33        r#"
34            SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
35            FROM pg_constraint c
36            JOIN pg_namespace n ON n.oid = c.connamespace
37            WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1
38            AND n.nspname = current_schema;
39        "#,
40        table_name
41    ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read constraint definitions of {table_name}"))
42}
43
44/// Look up and return the definitions of foreign-key constraints whose
45/// target table is the one specified.
46pub async fn describe_foreign_key_constraints_to_table(
47    conn: &mut PgConnection,
48    target_table_name: &str,
49) -> Result<Vec<ConstraintDescription>, Error> {
50    sqlx::query_as!(
51        ConstraintDescription,
52        r#"
53            SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
54            FROM pg_constraint c
55            JOIN pg_namespace n ON n.oid = c.connamespace
56            WHERE contype = 'f' AND confrelid::regclass::text = $1
57            AND n.nspname = current_schema;
58        "#,
59        target_table_name
60    ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read FK constraint definitions targetting {target_table_name}"))
61}
62
63/// Look up and return the definitions of all indices on a given table.
64pub async fn describe_indices_on_table(
65    conn: &mut PgConnection,
66    table_name: &str,
67) -> Result<Vec<IndexDescription>, Error> {
68    sqlx::query_as!(
69        IndexDescription,
70        r#"
71            SELECT indexname AS "name!", indexdef AS "definition!", schemaname AS "table_name!"
72            FROM pg_indexes
73            WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL
74        "#,
75        table_name
76    ).fetch_all(&mut *conn).await.into_database("cannot search for indices")
77}
78
79/// Drops a constraint from the database.
80///
81/// The constraint must exist prior to this call.
82pub async fn drop_constraint(
83    conn: &mut PgConnection,
84    constraint: &ConstraintDescription,
85) -> Result<(), Error> {
86    let name = &constraint.name;
87    let table_name = &constraint.table_name;
88    debug!("dropping constraint {name} on table {table_name}");
89    sqlx::query(&format!("ALTER TABLE {table_name} DROP CONSTRAINT {name};"))
90        .execute(&mut *conn)
91        .await
92        .into_database_with(|| format!("failed to drop constraint {name} on {table_name}"))?;
93
94    Ok(())
95}
96
97/// Drops an index from the database.
98///
99/// The index must exist prior to this call.
100pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
101    let index_name = &index.name;
102    debug!("dropping index {index_name}");
103    sqlx::query(&format!("DROP INDEX {index_name};"))
104        .execute(&mut *conn)
105        .await
106        .into_database_with(|| format!("failed to temporarily drop {index_name}"))?;
107
108    Ok(())
109}
110
111/// Restores (recreates) a constraint.
112///
113/// The constraint must not exist prior to this call.
114#[tracing::instrument(name = "syn2mas.restore_constraint", skip_all, fields(constraint.name = constraint.name))]
115pub async fn restore_constraint(
116    conn: &mut PgConnection,
117    constraint: &ConstraintDescription,
118) -> Result<(), Error> {
119    let start = Instant::now();
120
121    let ConstraintDescription {
122        name,
123        table_name,
124        definition,
125    } = &constraint;
126    info!("rebuilding constraint {name}");
127
128    sqlx::query(&format!(
129        "ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};"
130    ))
131    .execute(conn)
132    .await
133    .into_database_with(|| {
134        format!("failed to recreate constraint {name} on {table_name} with {definition}")
135    })?;
136
137    info!(
138        "constraint {name} rebuilt in {:.1}s",
139        Instant::now().duration_since(start).as_secs_f64()
140    );
141
142    Ok(())
143}
144
145/// Restores (recreates) a index.
146///
147/// The index must not exist prior to this call.
148#[tracing::instrument(name = "syn2mas.restore_index", skip_all, fields(index.name = index.name))]
149pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
150    let start = Instant::now();
151
152    let IndexDescription {
153        name,
154        table_name,
155        definition,
156    } = &index;
157
158    sqlx::query(&format!("{definition};"))
159        .execute(conn)
160        .await
161        .into_database_with(|| {
162            format!("failed to recreate index {name} on {table_name} with {definition}")
163        })?;
164
165    info!(
166        "index {name} rebuilt in {:.1}s",
167        Instant::now().duration_since(start).as_secs_f64()
168    );
169
170    Ok(())
171}