mas_storage_pg/
app_session.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7//! A module containing PostgreSQL implementation of repositories for sessions
8
9use async_trait::async_trait;
10use mas_data_model::{
11    Clock, CompatSession, CompatSessionState, Device, Session, SessionState, User,
12};
13use mas_storage::{
14    Page, Pagination,
15    app_session::{AppSession, AppSessionFilter, AppSessionRepository, AppSessionState},
16    compat::CompatSessionFilter,
17    oauth2::OAuth2SessionFilter,
18};
19use oauth2_types::scope::{Scope, ScopeToken};
20use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
21use sea_query::{
22    Alias, ColumnRef, CommonTableExpression, Expr, PostgresQueryBuilder, Query, UnionType,
23};
24use sea_query_binder::SqlxBinder;
25use sqlx::PgConnection;
26use tracing::Instrument;
27use ulid::Ulid;
28use uuid::Uuid;
29
30use crate::{
31    DatabaseError, ExecuteExt,
32    errors::DatabaseInconsistencyError,
33    filter::StatementExt,
34    iden::{CompatSessions, OAuth2Sessions},
35    pagination::QueryBuilderExt,
36};
37
38/// An implementation of [`AppSessionRepository`] for a PostgreSQL connection
39pub struct PgAppSessionRepository<'c> {
40    conn: &'c mut PgConnection,
41}
42
43impl<'c> PgAppSessionRepository<'c> {
44    /// Create a new [`PgAppSessionRepository`] from an active PostgreSQL
45    /// connection
46    pub fn new(conn: &'c mut PgConnection) -> Self {
47        Self { conn }
48    }
49}
50
51mod priv_ {
52    // The enum_def macro generates a public enum, which we don't want, because it
53    // triggers the missing docs warning
54
55    use std::net::IpAddr;
56
57    use chrono::{DateTime, Utc};
58    use mas_storage::pagination::Node;
59    use sea_query::enum_def;
60    use ulid::Ulid;
61    use uuid::Uuid;
62
63    #[derive(sqlx::FromRow)]
64    #[enum_def]
65    pub(super) struct AppSessionLookup {
66        pub(super) cursor: Uuid,
67        pub(super) compat_session_id: Option<Uuid>,
68        pub(super) oauth2_session_id: Option<Uuid>,
69        pub(super) oauth2_client_id: Option<Uuid>,
70        pub(super) user_session_id: Option<Uuid>,
71        pub(super) user_id: Option<Uuid>,
72        pub(super) scope_list: Option<Vec<String>>,
73        pub(super) device_id: Option<String>,
74        pub(super) human_name: Option<String>,
75        pub(super) created_at: DateTime<Utc>,
76        pub(super) finished_at: Option<DateTime<Utc>>,
77        pub(super) is_synapse_admin: Option<bool>,
78        pub(super) user_agent: Option<String>,
79        pub(super) last_active_at: Option<DateTime<Utc>>,
80        pub(super) last_active_ip: Option<IpAddr>,
81    }
82
83    impl Node<Ulid> for AppSessionLookup {
84        fn cursor(&self) -> Ulid {
85            self.cursor.into()
86        }
87    }
88}
89
90use priv_::{AppSessionLookup, AppSessionLookupIden};
91
92impl TryFrom<AppSessionLookup> for AppSession {
93    type Error = DatabaseError;
94
95    fn try_from(value: AppSessionLookup) -> Result<Self, Self::Error> {
96        // This is annoying to do, but we have to match on all the fields to determine
97        // whether it's a compat session or an oauth2 session
98        let AppSessionLookup {
99            cursor,
100            compat_session_id,
101            oauth2_session_id,
102            oauth2_client_id,
103            user_session_id,
104            user_id,
105            scope_list,
106            device_id,
107            human_name,
108            created_at,
109            finished_at,
110            is_synapse_admin,
111            user_agent,
112            last_active_at,
113            last_active_ip,
114        } = value;
115
116        let user_session_id = user_session_id.map(Ulid::from);
117
118        match (
119            compat_session_id,
120            oauth2_session_id,
121            oauth2_client_id,
122            user_id,
123            scope_list,
124            device_id,
125            is_synapse_admin,
126        ) {
127            (
128                Some(compat_session_id),
129                None,
130                None,
131                Some(user_id),
132                None,
133                device_id_opt,
134                Some(is_synapse_admin),
135            ) => {
136                let id = compat_session_id.into();
137                let device = device_id_opt
138                    .map(Device::try_from)
139                    .transpose()
140                    .map_err(|e| {
141                        DatabaseInconsistencyError::on("compat_sessions")
142                            .column("device_id")
143                            .row(id)
144                            .source(e)
145                    })?;
146
147                let state = match finished_at {
148                    None => CompatSessionState::Valid,
149                    Some(finished_at) => CompatSessionState::Finished { finished_at },
150                };
151
152                let session = CompatSession {
153                    id,
154                    state,
155                    user_id: user_id.into(),
156                    device,
157                    human_name,
158                    user_session_id,
159                    created_at,
160                    is_synapse_admin,
161                    user_agent,
162                    last_active_at,
163                    last_active_ip,
164                };
165
166                Ok(AppSession::Compat(Box::new(session)))
167            }
168
169            (
170                None,
171                Some(oauth2_session_id),
172                Some(oauth2_client_id),
173                user_id,
174                Some(scope_list),
175                None,
176                None,
177            ) => {
178                let id = oauth2_session_id.into();
179                let scope: Result<Scope, _> =
180                    scope_list.iter().map(|s| s.parse::<ScopeToken>()).collect();
181                let scope = scope.map_err(|e| {
182                    DatabaseInconsistencyError::on("oauth2_sessions")
183                        .column("scope")
184                        .row(id)
185                        .source(e)
186                })?;
187
188                let state = match value.finished_at {
189                    None => SessionState::Valid,
190                    Some(finished_at) => SessionState::Finished { finished_at },
191                };
192
193                let session = Session {
194                    id,
195                    state,
196                    created_at,
197                    client_id: oauth2_client_id.into(),
198                    user_id: user_id.map(Ulid::from),
199                    user_session_id,
200                    scope,
201                    user_agent,
202                    last_active_at,
203                    last_active_ip,
204                    human_name,
205                };
206
207                Ok(AppSession::OAuth2(Box::new(session)))
208            }
209
210            _ => Err(DatabaseInconsistencyError::on("sessions")
211                .row(cursor.into())
212                .into()),
213        }
214    }
215}
216
217/// Split a [`AppSessionFilter`] into two separate filters: a
218/// [`CompatSessionFilter`] and an [`OAuth2SessionFilter`].
219fn split_filter(
220    filter: AppSessionFilter<'_>,
221) -> (CompatSessionFilter<'_>, OAuth2SessionFilter<'_>) {
222    let mut compat_filter = CompatSessionFilter::new();
223    let mut oauth2_filter = OAuth2SessionFilter::new();
224
225    if let Some(user) = filter.user() {
226        compat_filter = compat_filter.for_user(user);
227        oauth2_filter = oauth2_filter.for_user(user);
228    }
229
230    match filter.state() {
231        Some(AppSessionState::Active) => {
232            compat_filter = compat_filter.active_only();
233            oauth2_filter = oauth2_filter.active_only();
234        }
235        Some(AppSessionState::Finished) => {
236            compat_filter = compat_filter.finished_only();
237            oauth2_filter = oauth2_filter.finished_only();
238        }
239        None => {}
240    }
241
242    if let Some(device) = filter.device() {
243        compat_filter = compat_filter.for_device(device);
244        oauth2_filter = oauth2_filter.for_device(device);
245    }
246
247    if let Some(browser_session) = filter.browser_session() {
248        compat_filter = compat_filter.for_browser_session(browser_session);
249        oauth2_filter = oauth2_filter.for_browser_session(browser_session);
250    }
251
252    if let Some(last_active_before) = filter.last_active_before() {
253        compat_filter = compat_filter.with_last_active_before(last_active_before);
254        oauth2_filter = oauth2_filter.with_last_active_before(last_active_before);
255    }
256
257    if let Some(last_active_after) = filter.last_active_after() {
258        compat_filter = compat_filter.with_last_active_after(last_active_after);
259        oauth2_filter = oauth2_filter.with_last_active_after(last_active_after);
260    }
261
262    (compat_filter, oauth2_filter)
263}
264
265#[async_trait]
266impl AppSessionRepository for PgAppSessionRepository<'_> {
267    type Error = DatabaseError;
268
269    #[tracing::instrument(
270        name = "db.app_session.list",
271        fields(
272            db.query.text,
273        ),
274        skip_all,
275        err,
276    )]
277    async fn list(
278        &mut self,
279        filter: AppSessionFilter<'_>,
280        pagination: Pagination,
281    ) -> Result<Page<AppSession>, Self::Error> {
282        let (compat_filter, oauth2_filter) = split_filter(filter);
283
284        let mut oauth2_session_select = Query::select()
285            .expr_as(
286                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
287                AppSessionLookupIden::Cursor,
288            )
289            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::CompatSessionId)
290            .expr_as(
291                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
292                AppSessionLookupIden::Oauth2SessionId,
293            )
294            .expr_as(
295                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)),
296                AppSessionLookupIden::Oauth2ClientId,
297            )
298            .expr_as(
299                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserSessionId)),
300                AppSessionLookupIden::UserSessionId,
301            )
302            .expr_as(
303                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)),
304                AppSessionLookupIden::UserId,
305            )
306            .expr_as(
307                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)),
308                AppSessionLookupIden::ScopeList,
309            )
310            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
311            .expr_as(
312                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::HumanName)),
313                AppSessionLookupIden::HumanName,
314            )
315            .expr_as(
316                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
317                AppSessionLookupIden::CreatedAt,
318            )
319            .expr_as(
320                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::FinishedAt)),
321                AppSessionLookupIden::FinishedAt,
322            )
323            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::IsSynapseAdmin)
324            .expr_as(
325                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserAgent)),
326                AppSessionLookupIden::UserAgent,
327            )
328            .expr_as(
329                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)),
330                AppSessionLookupIden::LastActiveAt,
331            )
332            .expr_as(
333                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveIp)),
334                AppSessionLookupIden::LastActiveIp,
335            )
336            .from(OAuth2Sessions::Table)
337            .apply_filter(oauth2_filter)
338            .clone();
339
340        let compat_session_select = Query::select()
341            .expr_as(
342                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
343                AppSessionLookupIden::Cursor,
344            )
345            .expr_as(
346                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
347                AppSessionLookupIden::CompatSessionId,
348            )
349            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2SessionId)
350            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2ClientId)
351            .expr_as(
352                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
353                AppSessionLookupIden::UserSessionId,
354            )
355            .expr_as(
356                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
357                AppSessionLookupIden::UserId,
358            )
359            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::ScopeList)
360            .expr_as(
361                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
362                AppSessionLookupIden::DeviceId,
363            )
364            .expr_as(
365                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
366                AppSessionLookupIden::HumanName,
367            )
368            .expr_as(
369                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
370                AppSessionLookupIden::CreatedAt,
371            )
372            .expr_as(
373                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
374                AppSessionLookupIden::FinishedAt,
375            )
376            .expr_as(
377                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
378                AppSessionLookupIden::IsSynapseAdmin,
379            )
380            .expr_as(
381                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
382                AppSessionLookupIden::UserAgent,
383            )
384            .expr_as(
385                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
386                AppSessionLookupIden::LastActiveAt,
387            )
388            .expr_as(
389                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
390                AppSessionLookupIden::LastActiveIp,
391            )
392            .from(CompatSessions::Table)
393            .apply_filter(compat_filter)
394            .clone();
395
396        let common_table_expression = CommonTableExpression::new()
397            .query(
398                oauth2_session_select
399                    .union(UnionType::All, compat_session_select)
400                    .clone(),
401            )
402            .table_name(Alias::new("sessions"))
403            .clone();
404
405        let with_clause = Query::with().cte(common_table_expression).clone();
406
407        let select = Query::select()
408            .column(ColumnRef::Asterisk)
409            .from(Alias::new("sessions"))
410            .generate_pagination(AppSessionLookupIden::Cursor, pagination)
411            .clone();
412
413        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
414
415        let edges: Vec<AppSessionLookup> = sqlx::query_as_with(&sql, arguments)
416            .traced()
417            .fetch_all(&mut *self.conn)
418            .await?;
419
420        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
421
422        Ok(page)
423    }
424
425    #[tracing::instrument(
426        name = "db.app_session.count",
427        fields(
428            db.query.text,
429        ),
430        skip_all,
431        err,
432    )]
433    async fn count(&mut self, filter: AppSessionFilter<'_>) -> Result<usize, Self::Error> {
434        let (compat_filter, oauth2_filter) = split_filter(filter);
435        let mut oauth2_session_select = Query::select()
436            .expr(Expr::cust("1"))
437            .from(OAuth2Sessions::Table)
438            .apply_filter(oauth2_filter)
439            .clone();
440
441        let compat_session_select = Query::select()
442            .expr(Expr::cust("1"))
443            .from(CompatSessions::Table)
444            .apply_filter(compat_filter)
445            .clone();
446
447        let common_table_expression = CommonTableExpression::new()
448            .query(
449                oauth2_session_select
450                    .union(UnionType::All, compat_session_select)
451                    .clone(),
452            )
453            .table_name(Alias::new("sessions"))
454            .clone();
455
456        let with_clause = Query::with().cte(common_table_expression).clone();
457
458        let select = Query::select()
459            .expr(Expr::cust("COUNT(*)"))
460            .from(Alias::new("sessions"))
461            .clone();
462
463        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
464
465        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
466            .traced()
467            .fetch_one(&mut *self.conn)
468            .await?;
469
470        count
471            .try_into()
472            .map_err(DatabaseError::to_invalid_operation)
473    }
474
475    #[tracing::instrument(
476        name = "db.app_session.finish_sessions_to_replace_device",
477        fields(
478            db.query.text,
479            %user.id,
480            %device_id = device.as_str()
481        ),
482        skip_all,
483        err,
484    )]
485    async fn finish_sessions_to_replace_device(
486        &mut self,
487        clock: &dyn Clock,
488        user: &User,
489        device: &Device,
490    ) -> Result<bool, Self::Error> {
491        let mut affected = false;
492        // TODO need to invoke this from all the oauth2 login sites
493        let span = tracing::info_span!(
494            "db.app_session.finish_sessions_to_replace_device.compat_sessions",
495            { DB_QUERY_TEXT } = tracing::field::Empty,
496        );
497        let finished_at = clock.now();
498        let compat_affected = sqlx::query!(
499            "
500                UPDATE compat_sessions SET finished_at = $3 WHERE user_id = $1 AND device_id = $2 AND finished_at IS NULL
501            ",
502            Uuid::from(user.id),
503            device.as_str(),
504            finished_at
505        )
506        .record(&span)
507        .execute(&mut *self.conn)
508        .instrument(span)
509        .await?
510        .rows_affected();
511        affected |= compat_affected > 0;
512
513        if let Ok([stable_device_as_scope_token, unstable_device_as_scope_token]) =
514            device.to_scope_token()
515        {
516            let span = tracing::info_span!(
517                "db.app_session.finish_sessions_to_replace_device.oauth2_sessions",
518                { DB_QUERY_TEXT } = tracing::field::Empty,
519            );
520            let oauth2_affected = sqlx::query!(
521                "
522                    UPDATE oauth2_sessions
523                    SET finished_at = $4
524                    WHERE user_id = $1
525                      AND ($2 = ANY(scope_list) OR $3 = ANY(scope_list))
526                      AND finished_at IS NULL
527                ",
528                Uuid::from(user.id),
529                stable_device_as_scope_token.as_str(),
530                unstable_device_as_scope_token.as_str(),
531                finished_at
532            )
533            .record(&span)
534            .execute(&mut *self.conn)
535            .instrument(span)
536            .await?
537            .rows_affected();
538            affected |= oauth2_affected > 0;
539        }
540
541        Ok(affected)
542    }
543}
544
545#[cfg(test)]
546mod tests {
547    use chrono::Duration;
548    use mas_data_model::{Device, clock::MockClock};
549    use mas_storage::{
550        Pagination, RepositoryAccess,
551        app_session::{AppSession, AppSessionFilter},
552        oauth2::OAuth2SessionRepository,
553    };
554    use oauth2_types::{
555        requests::GrantType,
556        scope::{OPENID, Scope},
557    };
558    use rand::SeedableRng;
559    use rand_chacha::ChaChaRng;
560    use sqlx::PgPool;
561
562    use crate::PgRepository;
563
564    #[sqlx::test(migrator = "crate::MIGRATOR")]
565    async fn test_app_repo(pool: PgPool) {
566        let mut rng = ChaChaRng::seed_from_u64(42);
567        let clock = MockClock::default();
568        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
569
570        // Create a user
571        let user = repo
572            .user()
573            .add(&mut rng, &clock, "john".to_owned())
574            .await
575            .unwrap();
576
577        let all = AppSessionFilter::new().for_user(&user);
578        let active = all.active_only();
579        let finished = all.finished_only();
580        let pagination = Pagination::first(10);
581
582        assert_eq!(repo.app_session().count(all).await.unwrap(), 0);
583        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
584        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
585
586        let full_list = repo.app_session().list(all, pagination).await.unwrap();
587        assert!(full_list.edges.is_empty());
588        let active_list = repo.app_session().list(active, pagination).await.unwrap();
589        assert!(active_list.edges.is_empty());
590        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
591        assert!(finished_list.edges.is_empty());
592
593        // Start a compat session for that user
594        let device = Device::generate(&mut rng);
595        let compat_session = repo
596            .compat_session()
597            .add(&mut rng, &clock, &user, device.clone(), None, false, None)
598            .await
599            .unwrap();
600
601        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
602        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
603        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
604
605        let full_list = repo.app_session().list(all, pagination).await.unwrap();
606        assert_eq!(full_list.edges.len(), 1);
607        assert_eq!(
608            full_list.edges[0].node,
609            AppSession::Compat(Box::new(compat_session.clone()))
610        );
611        let active_list = repo.app_session().list(active, pagination).await.unwrap();
612        assert_eq!(active_list.edges.len(), 1);
613        assert_eq!(
614            active_list.edges[0].node,
615            AppSession::Compat(Box::new(compat_session.clone()))
616        );
617        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
618        assert!(finished_list.edges.is_empty());
619
620        // Finish the session
621        let compat_session = repo
622            .compat_session()
623            .finish(&clock, compat_session)
624            .await
625            .unwrap();
626
627        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
628        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
629        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
630
631        let full_list = repo.app_session().list(all, pagination).await.unwrap();
632        assert_eq!(full_list.edges.len(), 1);
633        assert_eq!(
634            full_list.edges[0].node,
635            AppSession::Compat(Box::new(compat_session.clone()))
636        );
637        let active_list = repo.app_session().list(active, pagination).await.unwrap();
638        assert!(active_list.edges.is_empty());
639        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
640        assert_eq!(finished_list.edges.len(), 1);
641        assert_eq!(
642            finished_list.edges[0].node,
643            AppSession::Compat(Box::new(compat_session.clone()))
644        );
645
646        // Start an OAuth2 session
647        let client = repo
648            .oauth2_client()
649            .add(
650                &mut rng,
651                &clock,
652                vec!["https://example.com/redirect".parse().unwrap()],
653                None,
654                None,
655                None,
656                vec![GrantType::AuthorizationCode],
657                Some("First client".to_owned()),
658                Some("https://example.com/logo.png".parse().unwrap()),
659                Some("https://example.com/".parse().unwrap()),
660                Some("https://example.com/policy".parse().unwrap()),
661                Some("https://example.com/tos".parse().unwrap()),
662                Some("https://example.com/jwks.json".parse().unwrap()),
663                None,
664                None,
665                None,
666                None,
667                None,
668                Some("https://example.com/login".parse().unwrap()),
669            )
670            .await
671            .unwrap();
672
673        let device2 = Device::generate(&mut rng);
674        let scope: Scope = [OPENID]
675            .into_iter()
676            .chain(device2.to_scope_token().unwrap().into_iter())
677            .collect();
678
679        // We're moving the clock forward by 1 minute between each session to ensure
680        // we're getting consistent ordering in lists.
681        clock.advance(Duration::try_minutes(1).unwrap());
682
683        let oauth_session = repo
684            .oauth2_session()
685            .add(&mut rng, &clock, &client, Some(&user), None, scope)
686            .await
687            .unwrap();
688
689        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
690        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
691        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
692
693        let full_list = repo.app_session().list(all, pagination).await.unwrap();
694        assert_eq!(full_list.edges.len(), 2);
695        assert_eq!(
696            full_list.edges[0].node,
697            AppSession::Compat(Box::new(compat_session.clone()))
698        );
699        assert_eq!(
700            full_list.edges[1].node,
701            AppSession::OAuth2(Box::new(oauth_session.clone()))
702        );
703
704        let active_list = repo.app_session().list(active, pagination).await.unwrap();
705        assert_eq!(active_list.edges.len(), 1);
706        assert_eq!(
707            active_list.edges[0].node,
708            AppSession::OAuth2(Box::new(oauth_session.clone()))
709        );
710
711        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
712        assert_eq!(finished_list.edges.len(), 1);
713        assert_eq!(
714            finished_list.edges[0].node,
715            AppSession::Compat(Box::new(compat_session.clone()))
716        );
717
718        // Finish the session
719        let oauth_session = repo
720            .oauth2_session()
721            .finish(&clock, oauth_session)
722            .await
723            .unwrap();
724
725        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
726        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
727        assert_eq!(repo.app_session().count(finished).await.unwrap(), 2);
728
729        let full_list = repo.app_session().list(all, pagination).await.unwrap();
730        assert_eq!(full_list.edges.len(), 2);
731        assert_eq!(
732            full_list.edges[0].node,
733            AppSession::Compat(Box::new(compat_session.clone()))
734        );
735        assert_eq!(
736            full_list.edges[1].node,
737            AppSession::OAuth2(Box::new(oauth_session.clone()))
738        );
739
740        let active_list = repo.app_session().list(active, pagination).await.unwrap();
741        assert!(active_list.edges.is_empty());
742
743        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
744        assert_eq!(finished_list.edges.len(), 2);
745        assert_eq!(
746            finished_list.edges[0].node,
747            AppSession::Compat(Box::new(compat_session.clone()))
748        );
749        assert_eq!(
750            full_list.edges[1].node,
751            AppSession::OAuth2(Box::new(oauth_session.clone()))
752        );
753
754        // Query by device
755        let filter = AppSessionFilter::new().for_device(&device);
756        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
757        let list = repo.app_session().list(filter, pagination).await.unwrap();
758        assert_eq!(list.edges.len(), 1);
759        assert_eq!(
760            list.edges[0].node,
761            AppSession::Compat(Box::new(compat_session.clone()))
762        );
763
764        let filter = AppSessionFilter::new().for_device(&device2);
765        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
766        let list = repo.app_session().list(filter, pagination).await.unwrap();
767        assert_eq!(list.edges.len(), 1);
768        assert_eq!(
769            list.edges[0].node,
770            AppSession::OAuth2(Box::new(oauth_session.clone()))
771        );
772
773        // Create a second user
774        let user2 = repo
775            .user()
776            .add(&mut rng, &clock, "alice".to_owned())
777            .await
778            .unwrap();
779
780        // If we list/count for this user, we should get nothing
781        let filter = AppSessionFilter::new().for_user(&user2);
782        assert_eq!(repo.app_session().count(filter).await.unwrap(), 0);
783        let list = repo.app_session().list(filter, pagination).await.unwrap();
784        assert!(list.edges.is_empty());
785    }
786}