1use std::net::IpAddr;
9
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12use mas_data_model::{
13 BrowserSession, Clock, CompatSession, CompatSessionState, CompatSsoLogin, CompatSsoLoginState,
14 Device, User,
15};
16use mas_storage::{
17 Page, Pagination,
18 compat::{CompatSessionFilter, CompatSessionRepository},
19 pagination::Node,
20};
21use rand::RngCore;
22use sea_query::{Expr, PostgresQueryBuilder, Query, enum_def};
23use sea_query_binder::SqlxBinder;
24use sqlx::PgConnection;
25use ulid::Ulid;
26use url::Url;
27use uuid::Uuid;
28
29use crate::{
30 DatabaseError, DatabaseInconsistencyError,
31 filter::{Filter, StatementExt, StatementWithJoinsExt},
32 iden::{CompatSessions, CompatSsoLogins, UserSessions},
33 pagination::QueryBuilderExt,
34 tracing::ExecuteExt,
35};
36
37pub struct PgCompatSessionRepository<'c> {
39 conn: &'c mut PgConnection,
40}
41
42impl<'c> PgCompatSessionRepository<'c> {
43 pub fn new(conn: &'c mut PgConnection) -> Self {
46 Self { conn }
47 }
48}
49
50struct CompatSessionLookup {
51 compat_session_id: Uuid,
52 device_id: Option<String>,
53 human_name: Option<String>,
54 user_id: Uuid,
55 user_session_id: Option<Uuid>,
56 created_at: DateTime<Utc>,
57 finished_at: Option<DateTime<Utc>>,
58 is_synapse_admin: bool,
59 user_agent: Option<String>,
60 last_active_at: Option<DateTime<Utc>>,
61 last_active_ip: Option<IpAddr>,
62}
63
64impl Node<Ulid> for CompatSessionLookup {
65 fn cursor(&self) -> Ulid {
66 self.compat_session_id.into()
67 }
68}
69
70impl From<CompatSessionLookup> for CompatSession {
71 fn from(value: CompatSessionLookup) -> Self {
72 let id = value.compat_session_id.into();
73
74 let state = match value.finished_at {
75 None => CompatSessionState::Valid,
76 Some(finished_at) => CompatSessionState::Finished { finished_at },
77 };
78
79 CompatSession {
80 id,
81 state,
82 user_id: value.user_id.into(),
83 user_session_id: value.user_session_id.map(Ulid::from),
84 device: value.device_id.map(Device::from),
85 human_name: value.human_name,
86 created_at: value.created_at,
87 is_synapse_admin: value.is_synapse_admin,
88 user_agent: value.user_agent,
89 last_active_at: value.last_active_at,
90 last_active_ip: value.last_active_ip,
91 }
92 }
93}
94
95#[derive(sqlx::FromRow)]
96#[enum_def]
97struct CompatSessionAndSsoLoginLookup {
98 compat_session_id: Uuid,
99 device_id: Option<String>,
100 human_name: Option<String>,
101 user_id: Uuid,
102 user_session_id: Option<Uuid>,
103 created_at: DateTime<Utc>,
104 finished_at: Option<DateTime<Utc>>,
105 is_synapse_admin: bool,
106 user_agent: Option<String>,
107 last_active_at: Option<DateTime<Utc>>,
108 last_active_ip: Option<IpAddr>,
109 compat_sso_login_id: Option<Uuid>,
110 compat_sso_login_token: Option<String>,
111 compat_sso_login_redirect_uri: Option<String>,
112 compat_sso_login_created_at: Option<DateTime<Utc>>,
113 compat_sso_login_fulfilled_at: Option<DateTime<Utc>>,
114 compat_sso_login_exchanged_at: Option<DateTime<Utc>>,
115}
116
117impl Node<Ulid> for CompatSessionAndSsoLoginLookup {
118 fn cursor(&self) -> Ulid {
119 self.compat_session_id.into()
120 }
121}
122
123impl TryFrom<CompatSessionAndSsoLoginLookup> for (CompatSession, Option<CompatSsoLogin>) {
124 type Error = DatabaseInconsistencyError;
125
126 fn try_from(value: CompatSessionAndSsoLoginLookup) -> Result<Self, Self::Error> {
127 let id = value.compat_session_id.into();
128
129 let state = match value.finished_at {
130 None => CompatSessionState::Valid,
131 Some(finished_at) => CompatSessionState::Finished { finished_at },
132 };
133
134 let session = CompatSession {
135 id,
136 state,
137 user_id: value.user_id.into(),
138 device: value.device_id.map(Device::from),
139 human_name: value.human_name,
140 user_session_id: value.user_session_id.map(Ulid::from),
141 created_at: value.created_at,
142 is_synapse_admin: value.is_synapse_admin,
143 user_agent: value.user_agent,
144 last_active_at: value.last_active_at,
145 last_active_ip: value.last_active_ip,
146 };
147
148 match (
149 value.compat_sso_login_id,
150 value.compat_sso_login_token,
151 value.compat_sso_login_redirect_uri,
152 value.compat_sso_login_created_at,
153 value.compat_sso_login_fulfilled_at,
154 value.compat_sso_login_exchanged_at,
155 ) {
156 (None, None, None, None, None, None) => Ok((session, None)),
157 (
158 Some(id),
159 Some(login_token),
160 Some(redirect_uri),
161 Some(created_at),
162 fulfilled_at,
163 exchanged_at,
164 ) => {
165 let id = id.into();
166 let redirect_uri = Url::parse(&redirect_uri).map_err(|e| {
167 DatabaseInconsistencyError::on("compat_sso_logins")
168 .column("redirect_uri")
169 .row(id)
170 .source(e)
171 })?;
172
173 let state = match (fulfilled_at, exchanged_at) {
174 (Some(fulfilled_at), Some(exchanged_at)) => CompatSsoLoginState::Exchanged {
175 fulfilled_at,
176 exchanged_at,
177 compat_session_id: session.id,
178 },
179 _ => return Err(DatabaseInconsistencyError::on("compat_sso_logins").row(id)),
180 };
181
182 let login = CompatSsoLogin {
183 id,
184 redirect_uri,
185 login_token,
186 created_at,
187 state,
188 };
189
190 Ok((session, Some(login)))
191 }
192 _ => Err(DatabaseInconsistencyError::on("compat_sso_logins").row(id)),
193 }
194 }
195}
196
197impl Filter for CompatSessionFilter<'_> {
198 fn generate_condition(&self, has_joins: bool) -> impl sea_query::IntoCondition {
199 sea_query::Condition::all()
200 .add_option(self.user().map(|user| {
201 Expr::col((CompatSessions::Table, CompatSessions::UserId)).eq(Uuid::from(user.id))
202 }))
203 .add_option(self.browser_session().map(|browser_session| {
204 Expr::col((CompatSessions::Table, CompatSessions::UserSessionId))
205 .eq(Uuid::from(browser_session.id))
206 }))
207 .add_option(self.browser_session_filter().map(|browser_session_filter| {
208 Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)).in_subquery(
209 Query::select()
210 .expr(Expr::col((
211 UserSessions::Table,
212 UserSessions::UserSessionId,
213 )))
214 .apply_filter(browser_session_filter)
215 .from(UserSessions::Table)
216 .take(),
217 )
218 }))
219 .add_option(self.state().map(|state| {
220 if state.is_active() {
221 Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)).is_null()
222 } else {
223 Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)).is_not_null()
224 }
225 }))
226 .add_option(self.auth_type().map(|auth_type| {
227 if has_joins {
230 if auth_type.is_sso_login() {
231 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId))
232 .is_not_null()
233 } else {
234 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId))
235 .is_null()
236 }
237 } else {
238 let compat_sso_logins = Query::select()
242 .expr(Expr::col((
243 CompatSsoLogins::Table,
244 CompatSsoLogins::CompatSessionId,
245 )))
246 .from(CompatSsoLogins::Table)
247 .take();
248
249 if auth_type.is_sso_login() {
250 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
251 .eq(Expr::any(compat_sso_logins))
252 } else {
253 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
254 .ne(Expr::all(compat_sso_logins))
255 }
256 }
257 }))
258 .add_option(self.last_active_after().map(|last_active_after| {
259 Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt))
260 .gt(last_active_after)
261 }))
262 .add_option(self.last_active_before().map(|last_active_before| {
263 Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt))
264 .lt(last_active_before)
265 }))
266 .add_option(self.device().map(|device| {
267 Expr::col((CompatSessions::Table, CompatSessions::DeviceId)).eq(device.as_str())
268 }))
269 }
270}
271
272#[async_trait]
273impl CompatSessionRepository for PgCompatSessionRepository<'_> {
274 type Error = DatabaseError;
275
276 #[tracing::instrument(
277 name = "db.compat_session.lookup",
278 skip_all,
279 fields(
280 db.query.text,
281 compat_session.id = %id,
282 ),
283 err,
284 )]
285 async fn lookup(&mut self, id: Ulid) -> Result<Option<CompatSession>, Self::Error> {
286 let res = sqlx::query_as!(
287 CompatSessionLookup,
288 r#"
289 SELECT compat_session_id
290 , device_id
291 , human_name
292 , user_id
293 , user_session_id
294 , created_at
295 , finished_at
296 , is_synapse_admin
297 , user_agent
298 , last_active_at
299 , last_active_ip as "last_active_ip: IpAddr"
300 FROM compat_sessions
301 WHERE compat_session_id = $1
302 "#,
303 Uuid::from(id),
304 )
305 .traced()
306 .fetch_optional(&mut *self.conn)
307 .await?;
308
309 let Some(res) = res else { return Ok(None) };
310
311 Ok(Some(res.into()))
312 }
313
314 #[tracing::instrument(
315 name = "db.compat_session.add",
316 skip_all,
317 fields(
318 db.query.text,
319 compat_session.id,
320 %user.id,
321 %user.username,
322 compat_session.device.id = device.as_str(),
323 ),
324 err,
325 )]
326 async fn add(
327 &mut self,
328 rng: &mut (dyn RngCore + Send),
329 clock: &dyn Clock,
330 user: &User,
331 device: Device,
332 browser_session: Option<&BrowserSession>,
333 is_synapse_admin: bool,
334 human_name: Option<String>,
335 ) -> Result<CompatSession, Self::Error> {
336 let created_at = clock.now();
337 let id = Ulid::from_datetime_with_source(created_at.into(), rng);
338 tracing::Span::current().record("compat_session.id", tracing::field::display(id));
339
340 sqlx::query!(
341 r#"
342 INSERT INTO compat_sessions
343 (compat_session_id, user_id, device_id,
344 user_session_id, created_at, is_synapse_admin,
345 human_name)
346 VALUES ($1, $2, $3, $4, $5, $6, $7)
347 "#,
348 Uuid::from(id),
349 Uuid::from(user.id),
350 device.as_str(),
351 browser_session.map(|s| Uuid::from(s.id)),
352 created_at,
353 is_synapse_admin,
354 human_name.as_deref(),
355 )
356 .traced()
357 .execute(&mut *self.conn)
358 .await?;
359
360 Ok(CompatSession {
361 id,
362 state: CompatSessionState::default(),
363 user_id: user.id,
364 device: Some(device),
365 human_name,
366 user_session_id: browser_session.map(|s| s.id),
367 created_at,
368 is_synapse_admin,
369 user_agent: None,
370 last_active_at: None,
371 last_active_ip: None,
372 })
373 }
374
375 #[tracing::instrument(
376 name = "db.compat_session.finish",
377 skip_all,
378 fields(
379 db.query.text,
380 %compat_session.id,
381 user.id = %compat_session.user_id,
382 compat_session.device.id = compat_session.device.as_ref().map(mas_data_model::Device::as_str),
383 ),
384 err,
385 )]
386 async fn finish(
387 &mut self,
388 clock: &dyn Clock,
389 compat_session: CompatSession,
390 ) -> Result<CompatSession, Self::Error> {
391 let finished_at = clock.now();
392
393 let res = sqlx::query!(
394 r#"
395 UPDATE compat_sessions cs
396 SET finished_at = $2
397 WHERE compat_session_id = $1
398 "#,
399 Uuid::from(compat_session.id),
400 finished_at,
401 )
402 .traced()
403 .execute(&mut *self.conn)
404 .await?;
405
406 DatabaseError::ensure_affected_rows(&res, 1)?;
407
408 let compat_session = compat_session
409 .finish(finished_at)
410 .map_err(DatabaseError::to_invalid_operation)?;
411
412 Ok(compat_session)
413 }
414
415 #[tracing::instrument(
416 name = "db.compat_session.finish_bulk",
417 skip_all,
418 fields(db.query.text),
419 err,
420 )]
421 async fn finish_bulk(
422 &mut self,
423 clock: &dyn Clock,
424 filter: CompatSessionFilter<'_>,
425 ) -> Result<usize, Self::Error> {
426 let finished_at = clock.now();
427 let (sql, arguments) = Query::update()
428 .table(CompatSessions::Table)
429 .value(CompatSessions::FinishedAt, finished_at)
430 .apply_filter(filter)
431 .build_sqlx(PostgresQueryBuilder);
432
433 let res = sqlx::query_with(&sql, arguments)
434 .traced()
435 .execute(&mut *self.conn)
436 .await?;
437
438 Ok(res.rows_affected().try_into().unwrap_or(usize::MAX))
439 }
440
441 #[tracing::instrument(
442 name = "db.compat_session.list",
443 skip_all,
444 fields(
445 db.query.text,
446 ),
447 err,
448 )]
449 async fn list(
450 &mut self,
451 filter: CompatSessionFilter<'_>,
452 pagination: Pagination,
453 ) -> Result<Page<(CompatSession, Option<CompatSsoLogin>)>, Self::Error> {
454 let (sql, arguments) = Query::select()
455 .expr_as(
456 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
457 CompatSessionAndSsoLoginLookupIden::CompatSessionId,
458 )
459 .expr_as(
460 Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
461 CompatSessionAndSsoLoginLookupIden::DeviceId,
462 )
463 .expr_as(
464 Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
465 CompatSessionAndSsoLoginLookupIden::HumanName,
466 )
467 .expr_as(
468 Expr::col((CompatSessions::Table, CompatSessions::UserId)),
469 CompatSessionAndSsoLoginLookupIden::UserId,
470 )
471 .expr_as(
472 Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
473 CompatSessionAndSsoLoginLookupIden::UserSessionId,
474 )
475 .expr_as(
476 Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
477 CompatSessionAndSsoLoginLookupIden::CreatedAt,
478 )
479 .expr_as(
480 Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
481 CompatSessionAndSsoLoginLookupIden::FinishedAt,
482 )
483 .expr_as(
484 Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
485 CompatSessionAndSsoLoginLookupIden::IsSynapseAdmin,
486 )
487 .expr_as(
488 Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
489 CompatSessionAndSsoLoginLookupIden::UserAgent,
490 )
491 .expr_as(
492 Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
493 CompatSessionAndSsoLoginLookupIden::LastActiveAt,
494 )
495 .expr_as(
496 Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
497 CompatSessionAndSsoLoginLookupIden::LastActiveIp,
498 )
499 .expr_as(
500 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId)),
501 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginId,
502 )
503 .expr_as(
504 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::LoginToken)),
505 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginToken,
506 )
507 .expr_as(
508 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::RedirectUri)),
509 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginRedirectUri,
510 )
511 .expr_as(
512 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CreatedAt)),
513 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginCreatedAt,
514 )
515 .expr_as(
516 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::FulfilledAt)),
517 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginFulfilledAt,
518 )
519 .expr_as(
520 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::ExchangedAt)),
521 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginExchangedAt,
522 )
523 .from(CompatSessions::Table)
524 .left_join(
525 CompatSsoLogins::Table,
526 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
527 .equals((CompatSsoLogins::Table, CompatSsoLogins::CompatSessionId)),
528 )
529 .apply_filter_with_joins(filter)
530 .generate_pagination(
531 (CompatSessions::Table, CompatSessions::CompatSessionId),
532 pagination,
533 )
534 .build_sqlx(PostgresQueryBuilder);
535
536 let edges: Vec<CompatSessionAndSsoLoginLookup> = sqlx::query_as_with(&sql, arguments)
537 .traced()
538 .fetch_all(&mut *self.conn)
539 .await?;
540
541 let page = pagination.process(edges).try_map(TryFrom::try_from)?;
542
543 Ok(page)
544 }
545
546 #[tracing::instrument(
547 name = "db.compat_session.count",
548 skip_all,
549 fields(
550 db.query.text,
551 ),
552 err,
553 )]
554 async fn count(&mut self, filter: CompatSessionFilter<'_>) -> Result<usize, Self::Error> {
555 let (sql, arguments) = sea_query::Query::select()
556 .expr(Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)).count())
557 .from(CompatSessions::Table)
558 .apply_filter(filter)
559 .build_sqlx(PostgresQueryBuilder);
560
561 let count: i64 = sqlx::query_scalar_with(&sql, arguments)
562 .traced()
563 .fetch_one(&mut *self.conn)
564 .await?;
565
566 count
567 .try_into()
568 .map_err(DatabaseError::to_invalid_operation)
569 }
570
571 #[tracing::instrument(
572 name = "db.compat_session.record_batch_activity",
573 skip_all,
574 fields(
575 db.query.text,
576 ),
577 err,
578 )]
579 async fn record_batch_activity(
580 &mut self,
581 mut activities: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
582 ) -> Result<(), Self::Error> {
583 activities.sort_unstable();
586 let mut ids = Vec::with_capacity(activities.len());
587 let mut last_activities = Vec::with_capacity(activities.len());
588 let mut ips = Vec::with_capacity(activities.len());
589
590 for (id, last_activity, ip) in activities {
591 ids.push(Uuid::from(id));
592 last_activities.push(last_activity);
593 ips.push(ip);
594 }
595
596 let res = sqlx::query!(
597 r#"
598 UPDATE compat_sessions
599 SET last_active_at = GREATEST(t.last_active_at, compat_sessions.last_active_at)
600 , last_active_ip = COALESCE(t.last_active_ip, compat_sessions.last_active_ip)
601 FROM (
602 SELECT *
603 FROM UNNEST($1::uuid[], $2::timestamptz[], $3::inet[])
604 AS t(compat_session_id, last_active_at, last_active_ip)
605 ) AS t
606 WHERE compat_sessions.compat_session_id = t.compat_session_id
607 "#,
608 &ids,
609 &last_activities,
610 &ips as &[Option<IpAddr>],
611 )
612 .traced()
613 .execute(&mut *self.conn)
614 .await?;
615
616 DatabaseError::ensure_affected_rows(&res, ids.len().try_into().unwrap_or(u64::MAX))?;
617
618 Ok(())
619 }
620
621 #[tracing::instrument(
622 name = "db.compat_session.record_user_agent",
623 skip_all,
624 fields(
625 db.query.text,
626 %compat_session.id,
627 ),
628 err,
629 )]
630 async fn record_user_agent(
631 &mut self,
632 mut compat_session: CompatSession,
633 user_agent: String,
634 ) -> Result<CompatSession, Self::Error> {
635 let res = sqlx::query!(
636 r#"
637 UPDATE compat_sessions
638 SET user_agent = $2
639 WHERE compat_session_id = $1
640 "#,
641 Uuid::from(compat_session.id),
642 &*user_agent,
643 )
644 .traced()
645 .execute(&mut *self.conn)
646 .await?;
647
648 compat_session.user_agent = Some(user_agent);
649
650 DatabaseError::ensure_affected_rows(&res, 1)?;
651
652 Ok(compat_session)
653 }
654
655 #[tracing::instrument(
656 name = "repository.compat_session.set_human_name",
657 skip(self),
658 fields(
659 compat_session.id = %compat_session.id,
660 compat_session.human_name = ?human_name,
661 ),
662 err,
663 )]
664 async fn set_human_name(
665 &mut self,
666 mut compat_session: CompatSession,
667 human_name: Option<String>,
668 ) -> Result<CompatSession, Self::Error> {
669 let res = sqlx::query!(
670 r#"
671 UPDATE compat_sessions
672 SET human_name = $2
673 WHERE compat_session_id = $1
674 "#,
675 Uuid::from(compat_session.id),
676 human_name.as_deref(),
677 )
678 .traced()
679 .execute(&mut *self.conn)
680 .await?;
681
682 compat_session.human_name = human_name;
683
684 DatabaseError::ensure_affected_rows(&res, 1)?;
685
686 Ok(compat_session)
687 }
688
689 #[tracing::instrument(
690 name = "db.compat_session.cleanup_finished",
691 skip_all,
692 fields(
693 db.query.text,
694 ),
695 err,
696 )]
697 async fn cleanup_finished(
698 &mut self,
699 since: Option<DateTime<Utc>>,
700 until: DateTime<Utc>,
701 limit: usize,
702 ) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
703 let res = sqlx::query!(
704 r#"
705 WITH
706 to_delete AS (
707 SELECT compat_session_id, finished_at
708 FROM compat_sessions
709 WHERE finished_at IS NOT NULL
710 AND ($1::timestamptz IS NULL OR finished_at >= $1)
711 AND finished_at < $2
712 ORDER BY finished_at ASC
713 LIMIT $3
714 FOR UPDATE
715 ),
716
717 -- Delete refresh tokens first because they reference access tokens
718 deleted_refresh_tokens AS (
719 DELETE FROM compat_refresh_tokens
720 USING to_delete
721 WHERE compat_refresh_tokens.compat_session_id = to_delete.compat_session_id
722 ),
723
724 deleted_access_tokens AS (
725 DELETE FROM compat_access_tokens
726 USING to_delete
727 WHERE compat_access_tokens.compat_session_id = to_delete.compat_session_id
728 ),
729
730 deleted_sso_logins AS (
731 DELETE FROM compat_sso_logins
732 USING to_delete
733 WHERE compat_sso_logins.compat_session_id = to_delete.compat_session_id
734 ),
735
736 deleted_sessions AS (
737 DELETE FROM compat_sessions
738 USING to_delete
739 WHERE compat_sessions.compat_session_id = to_delete.compat_session_id
740 RETURNING compat_sessions.finished_at
741 )
742
743 SELECT
744 COUNT(*) as "count!",
745 MAX(finished_at) as last_finished_at
746 FROM deleted_sessions
747 "#,
748 since,
749 until,
750 i64::try_from(limit).unwrap_or(i64::MAX),
751 )
752 .traced()
753 .fetch_one(&mut *self.conn)
754 .await?;
755
756 Ok((
757 res.count.try_into().unwrap_or(usize::MAX),
758 res.last_finished_at,
759 ))
760 }
761
762 #[tracing::instrument(
763 name = "db.compat_session.cleanup_inactive_ips",
764 skip_all,
765 fields(
766 db.query.text,
767 since = since.map(tracing::field::display),
768 threshold = %threshold,
769 limit = limit,
770 ),
771 err,
772 )]
773 async fn cleanup_inactive_ips(
774 &mut self,
775 since: Option<DateTime<Utc>>,
776 threshold: DateTime<Utc>,
777 limit: usize,
778 ) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
779 let res = sqlx::query!(
780 r#"
781 WITH to_update AS (
782 SELECT compat_session_id, last_active_at
783 FROM compat_sessions
784 WHERE last_active_ip IS NOT NULL
785 AND last_active_at IS NOT NULL
786 AND ($1::timestamptz IS NULL OR last_active_at >= $1)
787 AND last_active_at < $2
788 ORDER BY last_active_at ASC
789 LIMIT $3
790 FOR UPDATE
791 ),
792 updated AS (
793 UPDATE compat_sessions
794 SET last_active_ip = NULL
795 FROM to_update
796 WHERE compat_sessions.compat_session_id = to_update.compat_session_id
797 RETURNING compat_sessions.last_active_at
798 )
799 SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
800 "#,
801 since,
802 threshold,
803 i64::try_from(limit).unwrap_or(i64::MAX),
804 )
805 .traced()
806 .fetch_one(&mut *self.conn)
807 .await?;
808
809 Ok((
810 res.count.try_into().unwrap_or(usize::MAX),
811 res.last_active_at,
812 ))
813 }
814}