mas_storage_pg/
app_session.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7//! A module containing PostgreSQL implementation of repositories for sessions
8
9use async_trait::async_trait;
10use mas_data_model::{
11    CompatSession, CompatSessionState, Device, Session, SessionState, User, UserAgent,
12};
13use mas_storage::{
14    Clock, Page, Pagination,
15    app_session::{AppSession, AppSessionFilter, AppSessionRepository, AppSessionState},
16    compat::CompatSessionFilter,
17    oauth2::OAuth2SessionFilter,
18};
19use oauth2_types::scope::{Scope, ScopeToken};
20use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
21use sea_query::{
22    Alias, ColumnRef, CommonTableExpression, Expr, PostgresQueryBuilder, Query, UnionType,
23};
24use sea_query_binder::SqlxBinder;
25use sqlx::PgConnection;
26use tracing::Instrument;
27use ulid::Ulid;
28use uuid::Uuid;
29
30use crate::{
31    DatabaseError, ExecuteExt,
32    errors::DatabaseInconsistencyError,
33    filter::StatementExt,
34    iden::{CompatSessions, OAuth2Sessions},
35    pagination::QueryBuilderExt,
36};
37
38/// An implementation of [`AppSessionRepository`] for a PostgreSQL connection
39pub struct PgAppSessionRepository<'c> {
40    conn: &'c mut PgConnection,
41}
42
43impl<'c> PgAppSessionRepository<'c> {
44    /// Create a new [`PgAppSessionRepository`] from an active PostgreSQL
45    /// connection
46    pub fn new(conn: &'c mut PgConnection) -> Self {
47        Self { conn }
48    }
49}
50
51mod priv_ {
52    // The enum_def macro generates a public enum, which we don't want, because it
53    // triggers the missing docs warning
54
55    use std::net::IpAddr;
56
57    use chrono::{DateTime, Utc};
58    use sea_query::enum_def;
59    use uuid::Uuid;
60
61    #[derive(sqlx::FromRow)]
62    #[enum_def]
63    pub(super) struct AppSessionLookup {
64        pub(super) cursor: Uuid,
65        pub(super) compat_session_id: Option<Uuid>,
66        pub(super) oauth2_session_id: Option<Uuid>,
67        pub(super) oauth2_client_id: Option<Uuid>,
68        pub(super) user_session_id: Option<Uuid>,
69        pub(super) user_id: Option<Uuid>,
70        pub(super) scope_list: Option<Vec<String>>,
71        pub(super) device_id: Option<String>,
72        pub(super) human_name: Option<String>,
73        pub(super) created_at: DateTime<Utc>,
74        pub(super) finished_at: Option<DateTime<Utc>>,
75        pub(super) is_synapse_admin: Option<bool>,
76        pub(super) user_agent: Option<String>,
77        pub(super) last_active_at: Option<DateTime<Utc>>,
78        pub(super) last_active_ip: Option<IpAddr>,
79    }
80}
81
82use priv_::{AppSessionLookup, AppSessionLookupIden};
83
84impl TryFrom<AppSessionLookup> for AppSession {
85    type Error = DatabaseError;
86
87    #[allow(clippy::too_many_lines)]
88    fn try_from(value: AppSessionLookup) -> Result<Self, Self::Error> {
89        // This is annoying to do, but we have to match on all the fields to determine
90        // whether it's a compat session or an oauth2 session
91        let AppSessionLookup {
92            cursor,
93            compat_session_id,
94            oauth2_session_id,
95            oauth2_client_id,
96            user_session_id,
97            user_id,
98            scope_list,
99            device_id,
100            human_name,
101            created_at,
102            finished_at,
103            is_synapse_admin,
104            user_agent,
105            last_active_at,
106            last_active_ip,
107        } = value;
108
109        let user_agent = user_agent.map(UserAgent::parse);
110        let user_session_id = user_session_id.map(Ulid::from);
111
112        match (
113            compat_session_id,
114            oauth2_session_id,
115            oauth2_client_id,
116            user_id,
117            scope_list,
118            device_id,
119            is_synapse_admin,
120        ) {
121            (
122                Some(compat_session_id),
123                None,
124                None,
125                Some(user_id),
126                None,
127                device_id_opt,
128                Some(is_synapse_admin),
129            ) => {
130                let id = compat_session_id.into();
131                let device = device_id_opt
132                    .map(Device::try_from)
133                    .transpose()
134                    .map_err(|e| {
135                        DatabaseInconsistencyError::on("compat_sessions")
136                            .column("device_id")
137                            .row(id)
138                            .source(e)
139                    })?;
140
141                let state = match finished_at {
142                    None => CompatSessionState::Valid,
143                    Some(finished_at) => CompatSessionState::Finished { finished_at },
144                };
145
146                let session = CompatSession {
147                    id,
148                    state,
149                    user_id: user_id.into(),
150                    device,
151                    human_name,
152                    user_session_id,
153                    created_at,
154                    is_synapse_admin,
155                    user_agent,
156                    last_active_at,
157                    last_active_ip,
158                };
159
160                Ok(AppSession::Compat(Box::new(session)))
161            }
162
163            (
164                None,
165                Some(oauth2_session_id),
166                Some(oauth2_client_id),
167                user_id,
168                Some(scope_list),
169                None,
170                None,
171            ) => {
172                let id = oauth2_session_id.into();
173                let scope: Result<Scope, _> =
174                    scope_list.iter().map(|s| s.parse::<ScopeToken>()).collect();
175                let scope = scope.map_err(|e| {
176                    DatabaseInconsistencyError::on("oauth2_sessions")
177                        .column("scope")
178                        .row(id)
179                        .source(e)
180                })?;
181
182                let state = match value.finished_at {
183                    None => SessionState::Valid,
184                    Some(finished_at) => SessionState::Finished { finished_at },
185                };
186
187                let session = Session {
188                    id,
189                    state,
190                    created_at,
191                    client_id: oauth2_client_id.into(),
192                    user_id: user_id.map(Ulid::from),
193                    user_session_id,
194                    scope,
195                    user_agent,
196                    last_active_at,
197                    last_active_ip,
198                };
199
200                Ok(AppSession::OAuth2(Box::new(session)))
201            }
202
203            _ => Err(DatabaseInconsistencyError::on("sessions")
204                .row(cursor.into())
205                .into()),
206        }
207    }
208}
209
210/// Split a [`AppSessionFilter`] into two separate filters: a
211/// [`CompatSessionFilter`] and an [`OAuth2SessionFilter`].
212fn split_filter(
213    filter: AppSessionFilter<'_>,
214) -> (CompatSessionFilter<'_>, OAuth2SessionFilter<'_>) {
215    let mut compat_filter = CompatSessionFilter::new();
216    let mut oauth2_filter = OAuth2SessionFilter::new();
217
218    if let Some(user) = filter.user() {
219        compat_filter = compat_filter.for_user(user);
220        oauth2_filter = oauth2_filter.for_user(user);
221    }
222
223    match filter.state() {
224        Some(AppSessionState::Active) => {
225            compat_filter = compat_filter.active_only();
226            oauth2_filter = oauth2_filter.active_only();
227        }
228        Some(AppSessionState::Finished) => {
229            compat_filter = compat_filter.finished_only();
230            oauth2_filter = oauth2_filter.finished_only();
231        }
232        None => {}
233    }
234
235    if let Some(device) = filter.device() {
236        compat_filter = compat_filter.for_device(device);
237        oauth2_filter = oauth2_filter.for_device(device);
238    }
239
240    if let Some(browser_session) = filter.browser_session() {
241        compat_filter = compat_filter.for_browser_session(browser_session);
242        oauth2_filter = oauth2_filter.for_browser_session(browser_session);
243    }
244
245    if let Some(last_active_before) = filter.last_active_before() {
246        compat_filter = compat_filter.with_last_active_before(last_active_before);
247        oauth2_filter = oauth2_filter.with_last_active_before(last_active_before);
248    }
249
250    if let Some(last_active_after) = filter.last_active_after() {
251        compat_filter = compat_filter.with_last_active_after(last_active_after);
252        oauth2_filter = oauth2_filter.with_last_active_after(last_active_after);
253    }
254
255    (compat_filter, oauth2_filter)
256}
257
258#[async_trait]
259impl AppSessionRepository for PgAppSessionRepository<'_> {
260    type Error = DatabaseError;
261
262    #[allow(clippy::too_many_lines)]
263    #[tracing::instrument(
264        name = "db.app_session.list",
265        fields(
266            db.query.text,
267        ),
268        skip_all,
269        err,
270    )]
271    async fn list(
272        &mut self,
273        filter: AppSessionFilter<'_>,
274        pagination: Pagination,
275    ) -> Result<Page<AppSession>, Self::Error> {
276        let (compat_filter, oauth2_filter) = split_filter(filter);
277
278        let mut oauth2_session_select = Query::select()
279            .expr_as(
280                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
281                AppSessionLookupIden::Cursor,
282            )
283            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::CompatSessionId)
284            .expr_as(
285                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
286                AppSessionLookupIden::Oauth2SessionId,
287            )
288            .expr_as(
289                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)),
290                AppSessionLookupIden::Oauth2ClientId,
291            )
292            .expr_as(
293                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserSessionId)),
294                AppSessionLookupIden::UserSessionId,
295            )
296            .expr_as(
297                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)),
298                AppSessionLookupIden::UserId,
299            )
300            .expr_as(
301                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)),
302                AppSessionLookupIden::ScopeList,
303            )
304            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
305            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::HumanName)
306            .expr_as(
307                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
308                AppSessionLookupIden::CreatedAt,
309            )
310            .expr_as(
311                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::FinishedAt)),
312                AppSessionLookupIden::FinishedAt,
313            )
314            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::IsSynapseAdmin)
315            .expr_as(
316                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserAgent)),
317                AppSessionLookupIden::UserAgent,
318            )
319            .expr_as(
320                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)),
321                AppSessionLookupIden::LastActiveAt,
322            )
323            .expr_as(
324                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveIp)),
325                AppSessionLookupIden::LastActiveIp,
326            )
327            .from(OAuth2Sessions::Table)
328            .apply_filter(oauth2_filter)
329            .clone();
330
331        let compat_session_select = Query::select()
332            .expr_as(
333                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
334                AppSessionLookupIden::Cursor,
335            )
336            .expr_as(
337                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
338                AppSessionLookupIden::CompatSessionId,
339            )
340            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2SessionId)
341            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2ClientId)
342            .expr_as(
343                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
344                AppSessionLookupIden::UserSessionId,
345            )
346            .expr_as(
347                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
348                AppSessionLookupIden::UserId,
349            )
350            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::ScopeList)
351            .expr_as(
352                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
353                AppSessionLookupIden::DeviceId,
354            )
355            .expr_as(
356                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
357                AppSessionLookupIden::HumanName,
358            )
359            .expr_as(
360                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
361                AppSessionLookupIden::CreatedAt,
362            )
363            .expr_as(
364                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
365                AppSessionLookupIden::FinishedAt,
366            )
367            .expr_as(
368                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
369                AppSessionLookupIden::IsSynapseAdmin,
370            )
371            .expr_as(
372                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
373                AppSessionLookupIden::UserAgent,
374            )
375            .expr_as(
376                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
377                AppSessionLookupIden::LastActiveAt,
378            )
379            .expr_as(
380                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
381                AppSessionLookupIden::LastActiveIp,
382            )
383            .from(CompatSessions::Table)
384            .apply_filter(compat_filter)
385            .clone();
386
387        let common_table_expression = CommonTableExpression::new()
388            .query(
389                oauth2_session_select
390                    .union(UnionType::All, compat_session_select)
391                    .clone(),
392            )
393            .table_name(Alias::new("sessions"))
394            .clone();
395
396        let with_clause = Query::with().cte(common_table_expression).clone();
397
398        let select = Query::select()
399            .column(ColumnRef::Asterisk)
400            .from(Alias::new("sessions"))
401            .generate_pagination(AppSessionLookupIden::Cursor, pagination)
402            .clone();
403
404        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
405
406        let edges: Vec<AppSessionLookup> = sqlx::query_as_with(&sql, arguments)
407            .traced()
408            .fetch_all(&mut *self.conn)
409            .await?;
410
411        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
412
413        Ok(page)
414    }
415
416    #[tracing::instrument(
417        name = "db.app_session.count",
418        fields(
419            db.query.text,
420        ),
421        skip_all,
422        err,
423    )]
424    async fn count(&mut self, filter: AppSessionFilter<'_>) -> Result<usize, Self::Error> {
425        let (compat_filter, oauth2_filter) = split_filter(filter);
426        let mut oauth2_session_select = Query::select()
427            .expr(Expr::cust("1"))
428            .from(OAuth2Sessions::Table)
429            .apply_filter(oauth2_filter)
430            .clone();
431
432        let compat_session_select = Query::select()
433            .expr(Expr::cust("1"))
434            .from(CompatSessions::Table)
435            .apply_filter(compat_filter)
436            .clone();
437
438        let common_table_expression = CommonTableExpression::new()
439            .query(
440                oauth2_session_select
441                    .union(UnionType::All, compat_session_select)
442                    .clone(),
443            )
444            .table_name(Alias::new("sessions"))
445            .clone();
446
447        let with_clause = Query::with().cte(common_table_expression).clone();
448
449        let select = Query::select()
450            .expr(Expr::cust("COUNT(*)"))
451            .from(Alias::new("sessions"))
452            .clone();
453
454        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
455
456        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
457            .traced()
458            .fetch_one(&mut *self.conn)
459            .await?;
460
461        count
462            .try_into()
463            .map_err(DatabaseError::to_invalid_operation)
464    }
465
466    #[tracing::instrument(
467        name = "db.app_session.finish_sessions_to_replace_device",
468        fields(
469            db.query.text,
470            %user.id,
471            %device_id = device.as_str()
472        ),
473        skip_all,
474        err,
475    )]
476    async fn finish_sessions_to_replace_device(
477        &mut self,
478        clock: &dyn Clock,
479        user: &User,
480        device: &Device,
481    ) -> Result<(), Self::Error> {
482        // TODO need to invoke this from all the oauth2 login sites
483        let span = tracing::info_span!(
484            "db.app_session.finish_sessions_to_replace_device.compat_sessions",
485            { DB_QUERY_TEXT } = tracing::field::Empty,
486        );
487        let finished_at = clock.now();
488        sqlx::query!(
489            "
490                UPDATE compat_sessions SET finished_at = $3 WHERE user_id = $1 AND device_id = $2 AND finished_at IS NULL
491            ",
492            Uuid::from(user.id),
493            device.as_str(),
494            finished_at
495        )
496        .record(&span)
497        .execute(&mut *self.conn)
498        .instrument(span)
499        .await?;
500
501        if let Ok(device_as_scope_token) = device.to_scope_token() {
502            let span = tracing::info_span!(
503                "db.app_session.finish_sessions_to_replace_device.oauth2_sessions",
504                { DB_QUERY_TEXT } = tracing::field::Empty,
505            );
506            sqlx::query!(
507                "
508                    UPDATE oauth2_sessions SET finished_at = $3 WHERE user_id = $1 AND $2 = ANY(scope_list) AND finished_at IS NULL
509                ",
510                Uuid::from(user.id),
511                device_as_scope_token.as_str(),
512                finished_at
513            )
514            .record(&span)
515            .execute(&mut *self.conn)
516            .instrument(span)
517            .await?;
518        }
519
520        Ok(())
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use chrono::Duration;
527    use mas_data_model::Device;
528    use mas_storage::{
529        Pagination, RepositoryAccess,
530        app_session::{AppSession, AppSessionFilter},
531        clock::MockClock,
532        oauth2::OAuth2SessionRepository,
533    };
534    use oauth2_types::{
535        requests::GrantType,
536        scope::{OPENID, Scope},
537    };
538    use rand::SeedableRng;
539    use rand_chacha::ChaChaRng;
540    use sqlx::PgPool;
541
542    use crate::PgRepository;
543
544    #[sqlx::test(migrator = "crate::MIGRATOR")]
545    async fn test_app_repo(pool: PgPool) {
546        let mut rng = ChaChaRng::seed_from_u64(42);
547        let clock = MockClock::default();
548        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
549
550        // Create a user
551        let user = repo
552            .user()
553            .add(&mut rng, &clock, "john".to_owned())
554            .await
555            .unwrap();
556
557        let all = AppSessionFilter::new().for_user(&user);
558        let active = all.active_only();
559        let finished = all.finished_only();
560        let pagination = Pagination::first(10);
561
562        assert_eq!(repo.app_session().count(all).await.unwrap(), 0);
563        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
564        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
565
566        let full_list = repo.app_session().list(all, pagination).await.unwrap();
567        assert!(full_list.edges.is_empty());
568        let active_list = repo.app_session().list(active, pagination).await.unwrap();
569        assert!(active_list.edges.is_empty());
570        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
571        assert!(finished_list.edges.is_empty());
572
573        // Start a compat session for that user
574        let device = Device::generate(&mut rng);
575        let compat_session = repo
576            .compat_session()
577            .add(&mut rng, &clock, &user, device.clone(), None, false)
578            .await
579            .unwrap();
580
581        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
582        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
583        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
584
585        let full_list = repo.app_session().list(all, pagination).await.unwrap();
586        assert_eq!(full_list.edges.len(), 1);
587        assert_eq!(
588            full_list.edges[0],
589            AppSession::Compat(Box::new(compat_session.clone()))
590        );
591        let active_list = repo.app_session().list(active, pagination).await.unwrap();
592        assert_eq!(active_list.edges.len(), 1);
593        assert_eq!(
594            active_list.edges[0],
595            AppSession::Compat(Box::new(compat_session.clone()))
596        );
597        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
598        assert!(finished_list.edges.is_empty());
599
600        // Finish the session
601        let compat_session = repo
602            .compat_session()
603            .finish(&clock, compat_session)
604            .await
605            .unwrap();
606
607        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
608        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
609        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
610
611        let full_list = repo.app_session().list(all, pagination).await.unwrap();
612        assert_eq!(full_list.edges.len(), 1);
613        assert_eq!(
614            full_list.edges[0],
615            AppSession::Compat(Box::new(compat_session.clone()))
616        );
617        let active_list = repo.app_session().list(active, pagination).await.unwrap();
618        assert!(active_list.edges.is_empty());
619        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
620        assert_eq!(finished_list.edges.len(), 1);
621        assert_eq!(
622            finished_list.edges[0],
623            AppSession::Compat(Box::new(compat_session.clone()))
624        );
625
626        // Start an OAuth2 session
627        let client = repo
628            .oauth2_client()
629            .add(
630                &mut rng,
631                &clock,
632                vec!["https://example.com/redirect".parse().unwrap()],
633                None,
634                None,
635                None,
636                vec![GrantType::AuthorizationCode],
637                Some("First client".to_owned()),
638                Some("https://example.com/logo.png".parse().unwrap()),
639                Some("https://example.com/".parse().unwrap()),
640                Some("https://example.com/policy".parse().unwrap()),
641                Some("https://example.com/tos".parse().unwrap()),
642                Some("https://example.com/jwks.json".parse().unwrap()),
643                None,
644                None,
645                None,
646                None,
647                None,
648                Some("https://example.com/login".parse().unwrap()),
649            )
650            .await
651            .unwrap();
652
653        let device2 = Device::generate(&mut rng);
654        let scope = Scope::from_iter([OPENID, device2.to_scope_token().unwrap()]);
655
656        // We're moving the clock forward by 1 minute between each session to ensure
657        // we're getting consistent ordering in lists.
658        clock.advance(Duration::try_minutes(1).unwrap());
659
660        let oauth_session = repo
661            .oauth2_session()
662            .add(&mut rng, &clock, &client, Some(&user), None, scope)
663            .await
664            .unwrap();
665
666        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
667        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
668        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
669
670        let full_list = repo.app_session().list(all, pagination).await.unwrap();
671        assert_eq!(full_list.edges.len(), 2);
672        assert_eq!(
673            full_list.edges[0],
674            AppSession::Compat(Box::new(compat_session.clone()))
675        );
676        assert_eq!(
677            full_list.edges[1],
678            AppSession::OAuth2(Box::new(oauth_session.clone()))
679        );
680
681        let active_list = repo.app_session().list(active, pagination).await.unwrap();
682        assert_eq!(active_list.edges.len(), 1);
683        assert_eq!(
684            active_list.edges[0],
685            AppSession::OAuth2(Box::new(oauth_session.clone()))
686        );
687
688        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
689        assert_eq!(finished_list.edges.len(), 1);
690        assert_eq!(
691            finished_list.edges[0],
692            AppSession::Compat(Box::new(compat_session.clone()))
693        );
694
695        // Finish the session
696        let oauth_session = repo
697            .oauth2_session()
698            .finish(&clock, oauth_session)
699            .await
700            .unwrap();
701
702        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
703        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
704        assert_eq!(repo.app_session().count(finished).await.unwrap(), 2);
705
706        let full_list = repo.app_session().list(all, pagination).await.unwrap();
707        assert_eq!(full_list.edges.len(), 2);
708        assert_eq!(
709            full_list.edges[0],
710            AppSession::Compat(Box::new(compat_session.clone()))
711        );
712        assert_eq!(
713            full_list.edges[1],
714            AppSession::OAuth2(Box::new(oauth_session.clone()))
715        );
716
717        let active_list = repo.app_session().list(active, pagination).await.unwrap();
718        assert!(active_list.edges.is_empty());
719
720        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
721        assert_eq!(finished_list.edges.len(), 2);
722        assert_eq!(
723            finished_list.edges[0],
724            AppSession::Compat(Box::new(compat_session.clone()))
725        );
726        assert_eq!(
727            full_list.edges[1],
728            AppSession::OAuth2(Box::new(oauth_session.clone()))
729        );
730
731        // Query by device
732        let filter = AppSessionFilter::new().for_device(&device);
733        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
734        let list = repo.app_session().list(filter, pagination).await.unwrap();
735        assert_eq!(list.edges.len(), 1);
736        assert_eq!(
737            list.edges[0],
738            AppSession::Compat(Box::new(compat_session.clone()))
739        );
740
741        let filter = AppSessionFilter::new().for_device(&device2);
742        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
743        let list = repo.app_session().list(filter, pagination).await.unwrap();
744        assert_eq!(list.edges.len(), 1);
745        assert_eq!(
746            list.edges[0],
747            AppSession::OAuth2(Box::new(oauth_session.clone()))
748        );
749
750        // Create a second user
751        let user2 = repo
752            .user()
753            .add(&mut rng, &clock, "alice".to_owned())
754            .await
755            .unwrap();
756
757        // If we list/count for this user, we should get nothing
758        let filter = AppSessionFilter::new().for_user(&user2);
759        assert_eq!(repo.app_session().count(filter).await.unwrap(), 0);
760        let list = repo.app_session().list(filter, pagination).await.unwrap();
761        assert!(list.edges.is_empty());
762    }
763}