mas_storage_pg/compat/
session.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
4//
5// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
6// Please see LICENSE files in the repository root for full details.
7
8use std::net::IpAddr;
9
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12use mas_data_model::{
13    BrowserSession, Clock, CompatSession, CompatSessionState, CompatSsoLogin, CompatSsoLoginState,
14    Device, User,
15};
16use mas_storage::{
17    Page, Pagination,
18    compat::{CompatSessionFilter, CompatSessionRepository},
19    pagination::Node,
20};
21use rand::RngCore;
22use sea_query::{Expr, PostgresQueryBuilder, Query, enum_def};
23use sea_query_binder::SqlxBinder;
24use sqlx::PgConnection;
25use ulid::Ulid;
26use url::Url;
27use uuid::Uuid;
28
29use crate::{
30    DatabaseError, DatabaseInconsistencyError,
31    filter::{Filter, StatementExt, StatementWithJoinsExt},
32    iden::{CompatSessions, CompatSsoLogins, UserSessions},
33    pagination::QueryBuilderExt,
34    tracing::ExecuteExt,
35};
36
37/// An implementation of [`CompatSessionRepository`] for a PostgreSQL connection
38pub struct PgCompatSessionRepository<'c> {
39    conn: &'c mut PgConnection,
40}
41
42impl<'c> PgCompatSessionRepository<'c> {
43    /// Create a new [`PgCompatSessionRepository`] from an active PostgreSQL
44    /// connection
45    pub fn new(conn: &'c mut PgConnection) -> Self {
46        Self { conn }
47    }
48}
49
50struct CompatSessionLookup {
51    compat_session_id: Uuid,
52    device_id: Option<String>,
53    human_name: Option<String>,
54    user_id: Uuid,
55    user_session_id: Option<Uuid>,
56    created_at: DateTime<Utc>,
57    finished_at: Option<DateTime<Utc>>,
58    is_synapse_admin: bool,
59    user_agent: Option<String>,
60    last_active_at: Option<DateTime<Utc>>,
61    last_active_ip: Option<IpAddr>,
62}
63
64impl Node<Ulid> for CompatSessionLookup {
65    fn cursor(&self) -> Ulid {
66        self.compat_session_id.into()
67    }
68}
69
70impl From<CompatSessionLookup> for CompatSession {
71    fn from(value: CompatSessionLookup) -> Self {
72        let id = value.compat_session_id.into();
73
74        let state = match value.finished_at {
75            None => CompatSessionState::Valid,
76            Some(finished_at) => CompatSessionState::Finished { finished_at },
77        };
78
79        CompatSession {
80            id,
81            state,
82            user_id: value.user_id.into(),
83            user_session_id: value.user_session_id.map(Ulid::from),
84            device: value.device_id.map(Device::from),
85            human_name: value.human_name,
86            created_at: value.created_at,
87            is_synapse_admin: value.is_synapse_admin,
88            user_agent: value.user_agent,
89            last_active_at: value.last_active_at,
90            last_active_ip: value.last_active_ip,
91        }
92    }
93}
94
95#[derive(sqlx::FromRow)]
96#[enum_def]
97struct CompatSessionAndSsoLoginLookup {
98    compat_session_id: Uuid,
99    device_id: Option<String>,
100    human_name: Option<String>,
101    user_id: Uuid,
102    user_session_id: Option<Uuid>,
103    created_at: DateTime<Utc>,
104    finished_at: Option<DateTime<Utc>>,
105    is_synapse_admin: bool,
106    user_agent: Option<String>,
107    last_active_at: Option<DateTime<Utc>>,
108    last_active_ip: Option<IpAddr>,
109    compat_sso_login_id: Option<Uuid>,
110    compat_sso_login_token: Option<String>,
111    compat_sso_login_redirect_uri: Option<String>,
112    compat_sso_login_created_at: Option<DateTime<Utc>>,
113    compat_sso_login_fulfilled_at: Option<DateTime<Utc>>,
114    compat_sso_login_exchanged_at: Option<DateTime<Utc>>,
115}
116
117impl Node<Ulid> for CompatSessionAndSsoLoginLookup {
118    fn cursor(&self) -> Ulid {
119        self.compat_session_id.into()
120    }
121}
122
123impl TryFrom<CompatSessionAndSsoLoginLookup> for (CompatSession, Option<CompatSsoLogin>) {
124    type Error = DatabaseInconsistencyError;
125
126    fn try_from(value: CompatSessionAndSsoLoginLookup) -> Result<Self, Self::Error> {
127        let id = value.compat_session_id.into();
128
129        let state = match value.finished_at {
130            None => CompatSessionState::Valid,
131            Some(finished_at) => CompatSessionState::Finished { finished_at },
132        };
133
134        let session = CompatSession {
135            id,
136            state,
137            user_id: value.user_id.into(),
138            device: value.device_id.map(Device::from),
139            human_name: value.human_name,
140            user_session_id: value.user_session_id.map(Ulid::from),
141            created_at: value.created_at,
142            is_synapse_admin: value.is_synapse_admin,
143            user_agent: value.user_agent,
144            last_active_at: value.last_active_at,
145            last_active_ip: value.last_active_ip,
146        };
147
148        match (
149            value.compat_sso_login_id,
150            value.compat_sso_login_token,
151            value.compat_sso_login_redirect_uri,
152            value.compat_sso_login_created_at,
153            value.compat_sso_login_fulfilled_at,
154            value.compat_sso_login_exchanged_at,
155        ) {
156            (None, None, None, None, None, None) => Ok((session, None)),
157            (
158                Some(id),
159                Some(login_token),
160                Some(redirect_uri),
161                Some(created_at),
162                fulfilled_at,
163                exchanged_at,
164            ) => {
165                let id = id.into();
166                let redirect_uri = Url::parse(&redirect_uri).map_err(|e| {
167                    DatabaseInconsistencyError::on("compat_sso_logins")
168                        .column("redirect_uri")
169                        .row(id)
170                        .source(e)
171                })?;
172
173                let state = match (fulfilled_at, exchanged_at) {
174                    (Some(fulfilled_at), Some(exchanged_at)) => CompatSsoLoginState::Exchanged {
175                        fulfilled_at,
176                        exchanged_at,
177                        compat_session_id: session.id,
178                    },
179                    _ => return Err(DatabaseInconsistencyError::on("compat_sso_logins").row(id)),
180                };
181
182                let login = CompatSsoLogin {
183                    id,
184                    redirect_uri,
185                    login_token,
186                    created_at,
187                    state,
188                };
189
190                Ok((session, Some(login)))
191            }
192            _ => Err(DatabaseInconsistencyError::on("compat_sso_logins").row(id)),
193        }
194    }
195}
196
197impl Filter for CompatSessionFilter<'_> {
198    fn generate_condition(&self, has_joins: bool) -> impl sea_query::IntoCondition {
199        sea_query::Condition::all()
200            .add_option(self.user().map(|user| {
201                Expr::col((CompatSessions::Table, CompatSessions::UserId)).eq(Uuid::from(user.id))
202            }))
203            .add_option(self.browser_session().map(|browser_session| {
204                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId))
205                    .eq(Uuid::from(browser_session.id))
206            }))
207            .add_option(self.browser_session_filter().map(|browser_session_filter| {
208                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)).in_subquery(
209                    Query::select()
210                        .expr(Expr::col((
211                            UserSessions::Table,
212                            UserSessions::UserSessionId,
213                        )))
214                        .apply_filter(browser_session_filter)
215                        .from(UserSessions::Table)
216                        .take(),
217                )
218            }))
219            .add_option(self.state().map(|state| {
220                if state.is_active() {
221                    Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)).is_null()
222                } else {
223                    Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)).is_not_null()
224                }
225            }))
226            .add_option(self.auth_type().map(|auth_type| {
227                // In in the SELECT to list sessions, we can rely on the JOINed table, whereas
228                // in other queries we need to do a subquery
229                if has_joins {
230                    if auth_type.is_sso_login() {
231                        Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId))
232                            .is_not_null()
233                    } else {
234                        Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId))
235                            .is_null()
236                    }
237                } else {
238                    // This builds either a:
239                    // `WHERE compat_session_id = ANY(...)`
240                    // or a `WHERE compat_session_id <> ALL(...)`
241                    let compat_sso_logins = Query::select()
242                        .expr(Expr::col((
243                            CompatSsoLogins::Table,
244                            CompatSsoLogins::CompatSessionId,
245                        )))
246                        .from(CompatSsoLogins::Table)
247                        .take();
248
249                    if auth_type.is_sso_login() {
250                        Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
251                            .eq(Expr::any(compat_sso_logins))
252                    } else {
253                        Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
254                            .ne(Expr::all(compat_sso_logins))
255                    }
256                }
257            }))
258            .add_option(self.last_active_after().map(|last_active_after| {
259                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt))
260                    .gt(last_active_after)
261            }))
262            .add_option(self.last_active_before().map(|last_active_before| {
263                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt))
264                    .lt(last_active_before)
265            }))
266            .add_option(self.device().map(|device| {
267                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)).eq(device.as_str())
268            }))
269    }
270}
271
272#[async_trait]
273impl CompatSessionRepository for PgCompatSessionRepository<'_> {
274    type Error = DatabaseError;
275
276    #[tracing::instrument(
277        name = "db.compat_session.lookup",
278        skip_all,
279        fields(
280            db.query.text,
281            compat_session.id = %id,
282        ),
283        err,
284    )]
285    async fn lookup(&mut self, id: Ulid) -> Result<Option<CompatSession>, Self::Error> {
286        let res = sqlx::query_as!(
287            CompatSessionLookup,
288            r#"
289                SELECT compat_session_id
290                     , device_id
291                     , human_name
292                     , user_id
293                     , user_session_id
294                     , created_at
295                     , finished_at
296                     , is_synapse_admin
297                     , user_agent
298                     , last_active_at
299                     , last_active_ip as "last_active_ip: IpAddr"
300                FROM compat_sessions
301                WHERE compat_session_id = $1
302            "#,
303            Uuid::from(id),
304        )
305        .traced()
306        .fetch_optional(&mut *self.conn)
307        .await?;
308
309        let Some(res) = res else { return Ok(None) };
310
311        Ok(Some(res.into()))
312    }
313
314    #[tracing::instrument(
315        name = "db.compat_session.add",
316        skip_all,
317        fields(
318            db.query.text,
319            compat_session.id,
320            %user.id,
321            %user.username,
322            compat_session.device.id = device.as_str(),
323        ),
324        err,
325    )]
326    async fn add(
327        &mut self,
328        rng: &mut (dyn RngCore + Send),
329        clock: &dyn Clock,
330        user: &User,
331        device: Device,
332        browser_session: Option<&BrowserSession>,
333        is_synapse_admin: bool,
334        human_name: Option<String>,
335    ) -> Result<CompatSession, Self::Error> {
336        let created_at = clock.now();
337        let id = Ulid::from_datetime_with_source(created_at.into(), rng);
338        tracing::Span::current().record("compat_session.id", tracing::field::display(id));
339
340        sqlx::query!(
341            r#"
342                INSERT INTO compat_sessions
343                    (compat_session_id, user_id, device_id,
344                     user_session_id, created_at, is_synapse_admin,
345                     human_name)
346                VALUES ($1, $2, $3, $4, $5, $6, $7)
347            "#,
348            Uuid::from(id),
349            Uuid::from(user.id),
350            device.as_str(),
351            browser_session.map(|s| Uuid::from(s.id)),
352            created_at,
353            is_synapse_admin,
354            human_name.as_deref(),
355        )
356        .traced()
357        .execute(&mut *self.conn)
358        .await?;
359
360        Ok(CompatSession {
361            id,
362            state: CompatSessionState::default(),
363            user_id: user.id,
364            device: Some(device),
365            human_name,
366            user_session_id: browser_session.map(|s| s.id),
367            created_at,
368            is_synapse_admin,
369            user_agent: None,
370            last_active_at: None,
371            last_active_ip: None,
372        })
373    }
374
375    #[tracing::instrument(
376        name = "db.compat_session.finish",
377        skip_all,
378        fields(
379            db.query.text,
380            %compat_session.id,
381            user.id = %compat_session.user_id,
382            compat_session.device.id = compat_session.device.as_ref().map(mas_data_model::Device::as_str),
383        ),
384        err,
385    )]
386    async fn finish(
387        &mut self,
388        clock: &dyn Clock,
389        compat_session: CompatSession,
390    ) -> Result<CompatSession, Self::Error> {
391        let finished_at = clock.now();
392
393        let res = sqlx::query!(
394            r#"
395                UPDATE compat_sessions cs
396                SET finished_at = $2
397                WHERE compat_session_id = $1
398            "#,
399            Uuid::from(compat_session.id),
400            finished_at,
401        )
402        .traced()
403        .execute(&mut *self.conn)
404        .await?;
405
406        DatabaseError::ensure_affected_rows(&res, 1)?;
407
408        let compat_session = compat_session
409            .finish(finished_at)
410            .map_err(DatabaseError::to_invalid_operation)?;
411
412        Ok(compat_session)
413    }
414
415    #[tracing::instrument(
416        name = "db.compat_session.finish_bulk",
417        skip_all,
418        fields(db.query.text),
419        err,
420    )]
421    async fn finish_bulk(
422        &mut self,
423        clock: &dyn Clock,
424        filter: CompatSessionFilter<'_>,
425    ) -> Result<usize, Self::Error> {
426        let finished_at = clock.now();
427        let (sql, arguments) = Query::update()
428            .table(CompatSessions::Table)
429            .value(CompatSessions::FinishedAt, finished_at)
430            .apply_filter(filter)
431            .build_sqlx(PostgresQueryBuilder);
432
433        let res = sqlx::query_with(&sql, arguments)
434            .traced()
435            .execute(&mut *self.conn)
436            .await?;
437
438        Ok(res.rows_affected().try_into().unwrap_or(usize::MAX))
439    }
440
441    #[tracing::instrument(
442        name = "db.compat_session.list",
443        skip_all,
444        fields(
445            db.query.text,
446        ),
447        err,
448    )]
449    async fn list(
450        &mut self,
451        filter: CompatSessionFilter<'_>,
452        pagination: Pagination,
453    ) -> Result<Page<(CompatSession, Option<CompatSsoLogin>)>, Self::Error> {
454        let (sql, arguments) = Query::select()
455            .expr_as(
456                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
457                CompatSessionAndSsoLoginLookupIden::CompatSessionId,
458            )
459            .expr_as(
460                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
461                CompatSessionAndSsoLoginLookupIden::DeviceId,
462            )
463            .expr_as(
464                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
465                CompatSessionAndSsoLoginLookupIden::HumanName,
466            )
467            .expr_as(
468                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
469                CompatSessionAndSsoLoginLookupIden::UserId,
470            )
471            .expr_as(
472                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
473                CompatSessionAndSsoLoginLookupIden::UserSessionId,
474            )
475            .expr_as(
476                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
477                CompatSessionAndSsoLoginLookupIden::CreatedAt,
478            )
479            .expr_as(
480                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
481                CompatSessionAndSsoLoginLookupIden::FinishedAt,
482            )
483            .expr_as(
484                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
485                CompatSessionAndSsoLoginLookupIden::IsSynapseAdmin,
486            )
487            .expr_as(
488                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
489                CompatSessionAndSsoLoginLookupIden::UserAgent,
490            )
491            .expr_as(
492                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
493                CompatSessionAndSsoLoginLookupIden::LastActiveAt,
494            )
495            .expr_as(
496                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
497                CompatSessionAndSsoLoginLookupIden::LastActiveIp,
498            )
499            .expr_as(
500                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId)),
501                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginId,
502            )
503            .expr_as(
504                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::LoginToken)),
505                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginToken,
506            )
507            .expr_as(
508                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::RedirectUri)),
509                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginRedirectUri,
510            )
511            .expr_as(
512                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CreatedAt)),
513                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginCreatedAt,
514            )
515            .expr_as(
516                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::FulfilledAt)),
517                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginFulfilledAt,
518            )
519            .expr_as(
520                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::ExchangedAt)),
521                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginExchangedAt,
522            )
523            .from(CompatSessions::Table)
524            .left_join(
525                CompatSsoLogins::Table,
526                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
527                    .equals((CompatSsoLogins::Table, CompatSsoLogins::CompatSessionId)),
528            )
529            .apply_filter_with_joins(filter)
530            .generate_pagination(
531                (CompatSessions::Table, CompatSessions::CompatSessionId),
532                pagination,
533            )
534            .build_sqlx(PostgresQueryBuilder);
535
536        let edges: Vec<CompatSessionAndSsoLoginLookup> = sqlx::query_as_with(&sql, arguments)
537            .traced()
538            .fetch_all(&mut *self.conn)
539            .await?;
540
541        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
542
543        Ok(page)
544    }
545
546    #[tracing::instrument(
547        name = "db.compat_session.count",
548        skip_all,
549        fields(
550            db.query.text,
551        ),
552        err,
553    )]
554    async fn count(&mut self, filter: CompatSessionFilter<'_>) -> Result<usize, Self::Error> {
555        let (sql, arguments) = sea_query::Query::select()
556            .expr(Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)).count())
557            .from(CompatSessions::Table)
558            .apply_filter(filter)
559            .build_sqlx(PostgresQueryBuilder);
560
561        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
562            .traced()
563            .fetch_one(&mut *self.conn)
564            .await?;
565
566        count
567            .try_into()
568            .map_err(DatabaseError::to_invalid_operation)
569    }
570
571    #[tracing::instrument(
572        name = "db.compat_session.record_batch_activity",
573        skip_all,
574        fields(
575            db.query.text,
576        ),
577        err,
578    )]
579    async fn record_batch_activity(
580        &mut self,
581        mut activities: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
582    ) -> Result<(), Self::Error> {
583        // Sort the activity by ID, so that when batching the updates, Postgres
584        // locks the rows in a stable order, preventing deadlocks
585        activities.sort_unstable();
586        let mut ids = Vec::with_capacity(activities.len());
587        let mut last_activities = Vec::with_capacity(activities.len());
588        let mut ips = Vec::with_capacity(activities.len());
589
590        for (id, last_activity, ip) in activities {
591            ids.push(Uuid::from(id));
592            last_activities.push(last_activity);
593            ips.push(ip);
594        }
595
596        let res = sqlx::query!(
597            r#"
598                UPDATE compat_sessions
599                SET last_active_at = GREATEST(t.last_active_at, compat_sessions.last_active_at)
600                  , last_active_ip = COALESCE(t.last_active_ip, compat_sessions.last_active_ip)
601                FROM (
602                    SELECT *
603                    FROM UNNEST($1::uuid[], $2::timestamptz[], $3::inet[])
604                        AS t(compat_session_id, last_active_at, last_active_ip)
605                ) AS t
606                WHERE compat_sessions.compat_session_id = t.compat_session_id
607            "#,
608            &ids,
609            &last_activities,
610            &ips as &[Option<IpAddr>],
611        )
612        .traced()
613        .execute(&mut *self.conn)
614        .await?;
615
616        DatabaseError::ensure_affected_rows(&res, ids.len().try_into().unwrap_or(u64::MAX))?;
617
618        Ok(())
619    }
620
621    #[tracing::instrument(
622        name = "db.compat_session.record_user_agent",
623        skip_all,
624        fields(
625            db.query.text,
626            %compat_session.id,
627        ),
628        err,
629    )]
630    async fn record_user_agent(
631        &mut self,
632        mut compat_session: CompatSession,
633        user_agent: String,
634    ) -> Result<CompatSession, Self::Error> {
635        let res = sqlx::query!(
636            r#"
637            UPDATE compat_sessions
638            SET user_agent = $2
639            WHERE compat_session_id = $1
640        "#,
641            Uuid::from(compat_session.id),
642            &*user_agent,
643        )
644        .traced()
645        .execute(&mut *self.conn)
646        .await?;
647
648        compat_session.user_agent = Some(user_agent);
649
650        DatabaseError::ensure_affected_rows(&res, 1)?;
651
652        Ok(compat_session)
653    }
654
655    #[tracing::instrument(
656        name = "repository.compat_session.set_human_name",
657        skip(self),
658        fields(
659            compat_session.id = %compat_session.id,
660            compat_session.human_name = ?human_name,
661        ),
662        err,
663    )]
664    async fn set_human_name(
665        &mut self,
666        mut compat_session: CompatSession,
667        human_name: Option<String>,
668    ) -> Result<CompatSession, Self::Error> {
669        let res = sqlx::query!(
670            r#"
671            UPDATE compat_sessions
672            SET human_name = $2
673            WHERE compat_session_id = $1
674        "#,
675            Uuid::from(compat_session.id),
676            human_name.as_deref(),
677        )
678        .traced()
679        .execute(&mut *self.conn)
680        .await?;
681
682        compat_session.human_name = human_name;
683
684        DatabaseError::ensure_affected_rows(&res, 1)?;
685
686        Ok(compat_session)
687    }
688
689    #[tracing::instrument(
690        name = "db.compat_session.cleanup_finished",
691        skip_all,
692        fields(
693            db.query.text,
694        ),
695        err,
696    )]
697    async fn cleanup_finished(
698        &mut self,
699        since: Option<DateTime<Utc>>,
700        until: DateTime<Utc>,
701        limit: usize,
702    ) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
703        let res = sqlx::query!(
704            r#"
705                WITH
706                    to_delete AS (
707                        SELECT compat_session_id, finished_at
708                        FROM compat_sessions
709                        WHERE finished_at IS NOT NULL
710                          AND ($1::timestamptz IS NULL OR finished_at >= $1)
711                          AND finished_at < $2
712                        ORDER BY finished_at ASC
713                        LIMIT $3
714                        FOR UPDATE
715                    ),
716
717                    -- Delete refresh tokens first because they reference access tokens
718                    deleted_refresh_tokens AS (
719                        DELETE FROM compat_refresh_tokens
720                        USING to_delete
721                        WHERE compat_refresh_tokens.compat_session_id = to_delete.compat_session_id
722                    ),
723
724                    deleted_access_tokens AS (
725                        DELETE FROM compat_access_tokens
726                        USING to_delete
727                        WHERE compat_access_tokens.compat_session_id = to_delete.compat_session_id
728                    ),
729
730                    deleted_sso_logins AS (
731                        DELETE FROM compat_sso_logins
732                        USING to_delete
733                        WHERE compat_sso_logins.compat_session_id = to_delete.compat_session_id
734                    ),
735
736                    deleted_sessions AS (
737                        DELETE FROM compat_sessions
738                        USING to_delete
739                        WHERE compat_sessions.compat_session_id = to_delete.compat_session_id
740                        RETURNING compat_sessions.finished_at
741                    )
742
743                SELECT
744                    COUNT(*) as "count!",
745                    MAX(finished_at) as last_finished_at
746                FROM deleted_sessions
747            "#,
748            since,
749            until,
750            i64::try_from(limit).unwrap_or(i64::MAX),
751        )
752        .traced()
753        .fetch_one(&mut *self.conn)
754        .await?;
755
756        Ok((
757            res.count.try_into().unwrap_or(usize::MAX),
758            res.last_finished_at,
759        ))
760    }
761
762    #[tracing::instrument(
763        name = "db.compat_session.cleanup_inactive_ips",
764        skip_all,
765        fields(
766            db.query.text,
767            since = since.map(tracing::field::display),
768            threshold = %threshold,
769            limit = limit,
770        ),
771        err,
772    )]
773    async fn cleanup_inactive_ips(
774        &mut self,
775        since: Option<DateTime<Utc>>,
776        threshold: DateTime<Utc>,
777        limit: usize,
778    ) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
779        let res = sqlx::query!(
780            r#"
781                WITH to_update AS (
782                    SELECT compat_session_id, last_active_at
783                    FROM compat_sessions
784                    WHERE last_active_ip IS NOT NULL
785                      AND last_active_at IS NOT NULL
786                      AND ($1::timestamptz IS NULL OR last_active_at >= $1)
787                      AND last_active_at < $2
788                    ORDER BY last_active_at ASC
789                    LIMIT $3
790                    FOR UPDATE
791                ),
792                updated AS (
793                    UPDATE compat_sessions
794                    SET last_active_ip = NULL
795                    FROM to_update
796                    WHERE compat_sessions.compat_session_id = to_update.compat_session_id
797                    RETURNING compat_sessions.last_active_at
798                )
799                SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
800            "#,
801            since,
802            threshold,
803            i64::try_from(limit).unwrap_or(i64::MAX),
804        )
805        .traced()
806        .fetch_one(&mut *self.conn)
807        .await?;
808
809        Ok((
810            res.count.try_into().unwrap_or(usize::MAX),
811            res.last_active_at,
812        ))
813    }
814}