1use async_trait::async_trait;
10use mas_data_model::{
11 Clock, CompatSession, CompatSessionState, Device, Session, SessionState, User,
12};
13use mas_storage::{
14 Page, Pagination,
15 app_session::{AppSession, AppSessionFilter, AppSessionRepository, AppSessionState},
16 compat::CompatSessionFilter,
17 oauth2::OAuth2SessionFilter,
18};
19use oauth2_types::scope::{Scope, ScopeToken};
20use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
21use sea_query::{
22 Alias, ColumnRef, CommonTableExpression, Expr, PostgresQueryBuilder, Query, UnionType,
23};
24use sea_query_binder::SqlxBinder;
25use sqlx::PgConnection;
26use tracing::Instrument;
27use ulid::Ulid;
28use uuid::Uuid;
29
30use crate::{
31 DatabaseError, ExecuteExt,
32 errors::DatabaseInconsistencyError,
33 filter::StatementExt,
34 iden::{CompatSessions, OAuth2Sessions},
35 pagination::QueryBuilderExt,
36};
37
38pub struct PgAppSessionRepository<'c> {
40 conn: &'c mut PgConnection,
41}
42
43impl<'c> PgAppSessionRepository<'c> {
44 pub fn new(conn: &'c mut PgConnection) -> Self {
47 Self { conn }
48 }
49}
50
51mod priv_ {
52 use std::net::IpAddr;
56
57 use chrono::{DateTime, Utc};
58 use mas_storage::pagination::Node;
59 use sea_query::enum_def;
60 use ulid::Ulid;
61 use uuid::Uuid;
62
63 #[derive(sqlx::FromRow)]
64 #[enum_def]
65 pub(super) struct AppSessionLookup {
66 pub(super) cursor: Uuid,
67 pub(super) compat_session_id: Option<Uuid>,
68 pub(super) oauth2_session_id: Option<Uuid>,
69 pub(super) oauth2_client_id: Option<Uuid>,
70 pub(super) user_session_id: Option<Uuid>,
71 pub(super) user_id: Option<Uuid>,
72 pub(super) scope_list: Option<Vec<String>>,
73 pub(super) device_id: Option<String>,
74 pub(super) human_name: Option<String>,
75 pub(super) created_at: DateTime<Utc>,
76 pub(super) finished_at: Option<DateTime<Utc>>,
77 pub(super) is_synapse_admin: Option<bool>,
78 pub(super) user_agent: Option<String>,
79 pub(super) last_active_at: Option<DateTime<Utc>>,
80 pub(super) last_active_ip: Option<IpAddr>,
81 }
82
83 impl Node<Ulid> for AppSessionLookup {
84 fn cursor(&self) -> Ulid {
85 self.cursor.into()
86 }
87 }
88}
89
90use priv_::{AppSessionLookup, AppSessionLookupIden};
91
92impl TryFrom<AppSessionLookup> for AppSession {
93 type Error = DatabaseError;
94
95 fn try_from(value: AppSessionLookup) -> Result<Self, Self::Error> {
96 let AppSessionLookup {
99 cursor,
100 compat_session_id,
101 oauth2_session_id,
102 oauth2_client_id,
103 user_session_id,
104 user_id,
105 scope_list,
106 device_id,
107 human_name,
108 created_at,
109 finished_at,
110 is_synapse_admin,
111 user_agent,
112 last_active_at,
113 last_active_ip,
114 } = value;
115
116 let user_session_id = user_session_id.map(Ulid::from);
117
118 match (
119 compat_session_id,
120 oauth2_session_id,
121 oauth2_client_id,
122 user_id,
123 scope_list,
124 device_id,
125 is_synapse_admin,
126 ) {
127 (
128 Some(compat_session_id),
129 None,
130 None,
131 Some(user_id),
132 None,
133 device_id_opt,
134 Some(is_synapse_admin),
135 ) => {
136 let id = compat_session_id.into();
137 let device = device_id_opt
138 .map(Device::try_from)
139 .transpose()
140 .map_err(|e| {
141 DatabaseInconsistencyError::on("compat_sessions")
142 .column("device_id")
143 .row(id)
144 .source(e)
145 })?;
146
147 let state = match finished_at {
148 None => CompatSessionState::Valid,
149 Some(finished_at) => CompatSessionState::Finished { finished_at },
150 };
151
152 let session = CompatSession {
153 id,
154 state,
155 user_id: user_id.into(),
156 device,
157 human_name,
158 user_session_id,
159 created_at,
160 is_synapse_admin,
161 user_agent,
162 last_active_at,
163 last_active_ip,
164 };
165
166 Ok(AppSession::Compat(Box::new(session)))
167 }
168
169 (
170 None,
171 Some(oauth2_session_id),
172 Some(oauth2_client_id),
173 user_id,
174 Some(scope_list),
175 None,
176 None,
177 ) => {
178 let id = oauth2_session_id.into();
179 let scope: Result<Scope, _> =
180 scope_list.iter().map(|s| s.parse::<ScopeToken>()).collect();
181 let scope = scope.map_err(|e| {
182 DatabaseInconsistencyError::on("oauth2_sessions")
183 .column("scope")
184 .row(id)
185 .source(e)
186 })?;
187
188 let state = match value.finished_at {
189 None => SessionState::Valid,
190 Some(finished_at) => SessionState::Finished { finished_at },
191 };
192
193 let session = Session {
194 id,
195 state,
196 created_at,
197 client_id: oauth2_client_id.into(),
198 user_id: user_id.map(Ulid::from),
199 user_session_id,
200 scope,
201 user_agent,
202 last_active_at,
203 last_active_ip,
204 human_name,
205 };
206
207 Ok(AppSession::OAuth2(Box::new(session)))
208 }
209
210 _ => Err(DatabaseInconsistencyError::on("sessions")
211 .row(cursor.into())
212 .into()),
213 }
214 }
215}
216
217fn split_filter(
220 filter: AppSessionFilter<'_>,
221) -> (CompatSessionFilter<'_>, OAuth2SessionFilter<'_>) {
222 let mut compat_filter = CompatSessionFilter::new();
223 let mut oauth2_filter = OAuth2SessionFilter::new();
224
225 if let Some(user) = filter.user() {
226 compat_filter = compat_filter.for_user(user);
227 oauth2_filter = oauth2_filter.for_user(user);
228 }
229
230 match filter.state() {
231 Some(AppSessionState::Active) => {
232 compat_filter = compat_filter.active_only();
233 oauth2_filter = oauth2_filter.active_only();
234 }
235 Some(AppSessionState::Finished) => {
236 compat_filter = compat_filter.finished_only();
237 oauth2_filter = oauth2_filter.finished_only();
238 }
239 None => {}
240 }
241
242 if let Some(device) = filter.device() {
243 compat_filter = compat_filter.for_device(device);
244 oauth2_filter = oauth2_filter.for_device(device);
245 }
246
247 if let Some(browser_session) = filter.browser_session() {
248 compat_filter = compat_filter.for_browser_session(browser_session);
249 oauth2_filter = oauth2_filter.for_browser_session(browser_session);
250 }
251
252 if let Some(last_active_before) = filter.last_active_before() {
253 compat_filter = compat_filter.with_last_active_before(last_active_before);
254 oauth2_filter = oauth2_filter.with_last_active_before(last_active_before);
255 }
256
257 if let Some(last_active_after) = filter.last_active_after() {
258 compat_filter = compat_filter.with_last_active_after(last_active_after);
259 oauth2_filter = oauth2_filter.with_last_active_after(last_active_after);
260 }
261
262 (compat_filter, oauth2_filter)
263}
264
265#[async_trait]
266impl AppSessionRepository for PgAppSessionRepository<'_> {
267 type Error = DatabaseError;
268
269 #[tracing::instrument(
270 name = "db.app_session.list",
271 fields(
272 db.query.text,
273 ),
274 skip_all,
275 err,
276 )]
277 async fn list(
278 &mut self,
279 filter: AppSessionFilter<'_>,
280 pagination: Pagination,
281 ) -> Result<Page<AppSession>, Self::Error> {
282 let (compat_filter, oauth2_filter) = split_filter(filter);
283
284 let mut oauth2_session_select = Query::select()
285 .expr_as(
286 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
287 AppSessionLookupIden::Cursor,
288 )
289 .expr_as(Expr::cust("NULL"), AppSessionLookupIden::CompatSessionId)
290 .expr_as(
291 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
292 AppSessionLookupIden::Oauth2SessionId,
293 )
294 .expr_as(
295 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)),
296 AppSessionLookupIden::Oauth2ClientId,
297 )
298 .expr_as(
299 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserSessionId)),
300 AppSessionLookupIden::UserSessionId,
301 )
302 .expr_as(
303 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)),
304 AppSessionLookupIden::UserId,
305 )
306 .expr_as(
307 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)),
308 AppSessionLookupIden::ScopeList,
309 )
310 .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
311 .expr_as(
312 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::HumanName)),
313 AppSessionLookupIden::HumanName,
314 )
315 .expr_as(
316 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
317 AppSessionLookupIden::CreatedAt,
318 )
319 .expr_as(
320 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::FinishedAt)),
321 AppSessionLookupIden::FinishedAt,
322 )
323 .expr_as(Expr::cust("NULL"), AppSessionLookupIden::IsSynapseAdmin)
324 .expr_as(
325 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserAgent)),
326 AppSessionLookupIden::UserAgent,
327 )
328 .expr_as(
329 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)),
330 AppSessionLookupIden::LastActiveAt,
331 )
332 .expr_as(
333 Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveIp)),
334 AppSessionLookupIden::LastActiveIp,
335 )
336 .from(OAuth2Sessions::Table)
337 .apply_filter(oauth2_filter)
338 .clone();
339
340 let compat_session_select = Query::select()
341 .expr_as(
342 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
343 AppSessionLookupIden::Cursor,
344 )
345 .expr_as(
346 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
347 AppSessionLookupIden::CompatSessionId,
348 )
349 .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2SessionId)
350 .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2ClientId)
351 .expr_as(
352 Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
353 AppSessionLookupIden::UserSessionId,
354 )
355 .expr_as(
356 Expr::col((CompatSessions::Table, CompatSessions::UserId)),
357 AppSessionLookupIden::UserId,
358 )
359 .expr_as(Expr::cust("NULL"), AppSessionLookupIden::ScopeList)
360 .expr_as(
361 Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
362 AppSessionLookupIden::DeviceId,
363 )
364 .expr_as(
365 Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
366 AppSessionLookupIden::HumanName,
367 )
368 .expr_as(
369 Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
370 AppSessionLookupIden::CreatedAt,
371 )
372 .expr_as(
373 Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
374 AppSessionLookupIden::FinishedAt,
375 )
376 .expr_as(
377 Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
378 AppSessionLookupIden::IsSynapseAdmin,
379 )
380 .expr_as(
381 Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
382 AppSessionLookupIden::UserAgent,
383 )
384 .expr_as(
385 Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
386 AppSessionLookupIden::LastActiveAt,
387 )
388 .expr_as(
389 Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
390 AppSessionLookupIden::LastActiveIp,
391 )
392 .from(CompatSessions::Table)
393 .apply_filter(compat_filter)
394 .clone();
395
396 let common_table_expression = CommonTableExpression::new()
397 .query(
398 oauth2_session_select
399 .union(UnionType::All, compat_session_select)
400 .clone(),
401 )
402 .table_name(Alias::new("sessions"))
403 .clone();
404
405 let with_clause = Query::with().cte(common_table_expression).clone();
406
407 let select = Query::select()
408 .column(ColumnRef::Asterisk)
409 .from(Alias::new("sessions"))
410 .generate_pagination(AppSessionLookupIden::Cursor, pagination)
411 .clone();
412
413 let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
414
415 let edges: Vec<AppSessionLookup> = sqlx::query_as_with(&sql, arguments)
416 .traced()
417 .fetch_all(&mut *self.conn)
418 .await?;
419
420 let page = pagination.process(edges).try_map(TryFrom::try_from)?;
421
422 Ok(page)
423 }
424
425 #[tracing::instrument(
426 name = "db.app_session.count",
427 fields(
428 db.query.text,
429 ),
430 skip_all,
431 err,
432 )]
433 async fn count(&mut self, filter: AppSessionFilter<'_>) -> Result<usize, Self::Error> {
434 let (compat_filter, oauth2_filter) = split_filter(filter);
435 let mut oauth2_session_select = Query::select()
436 .expr(Expr::cust("1"))
437 .from(OAuth2Sessions::Table)
438 .apply_filter(oauth2_filter)
439 .clone();
440
441 let compat_session_select = Query::select()
442 .expr(Expr::cust("1"))
443 .from(CompatSessions::Table)
444 .apply_filter(compat_filter)
445 .clone();
446
447 let common_table_expression = CommonTableExpression::new()
448 .query(
449 oauth2_session_select
450 .union(UnionType::All, compat_session_select)
451 .clone(),
452 )
453 .table_name(Alias::new("sessions"))
454 .clone();
455
456 let with_clause = Query::with().cte(common_table_expression).clone();
457
458 let select = Query::select()
459 .expr(Expr::cust("COUNT(*)"))
460 .from(Alias::new("sessions"))
461 .clone();
462
463 let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
464
465 let count: i64 = sqlx::query_scalar_with(&sql, arguments)
466 .traced()
467 .fetch_one(&mut *self.conn)
468 .await?;
469
470 count
471 .try_into()
472 .map_err(DatabaseError::to_invalid_operation)
473 }
474
475 #[tracing::instrument(
476 name = "db.app_session.finish_sessions_to_replace_device",
477 fields(
478 db.query.text,
479 %user.id,
480 %device_id = device.as_str()
481 ),
482 skip_all,
483 err,
484 )]
485 async fn finish_sessions_to_replace_device(
486 &mut self,
487 clock: &dyn Clock,
488 user: &User,
489 device: &Device,
490 ) -> Result<bool, Self::Error> {
491 let mut affected = false;
492 let span = tracing::info_span!(
494 "db.app_session.finish_sessions_to_replace_device.compat_sessions",
495 { DB_QUERY_TEXT } = tracing::field::Empty,
496 );
497 let finished_at = clock.now();
498 let compat_affected = sqlx::query!(
499 "
500 UPDATE compat_sessions SET finished_at = $3 WHERE user_id = $1 AND device_id = $2 AND finished_at IS NULL
501 ",
502 Uuid::from(user.id),
503 device.as_str(),
504 finished_at
505 )
506 .record(&span)
507 .execute(&mut *self.conn)
508 .instrument(span)
509 .await?
510 .rows_affected();
511 affected |= compat_affected > 0;
512
513 if let Ok([stable_device_as_scope_token, unstable_device_as_scope_token]) =
514 device.to_scope_token()
515 {
516 let span = tracing::info_span!(
517 "db.app_session.finish_sessions_to_replace_device.oauth2_sessions",
518 { DB_QUERY_TEXT } = tracing::field::Empty,
519 );
520 let oauth2_affected = sqlx::query!(
521 "
522 UPDATE oauth2_sessions
523 SET finished_at = $4
524 WHERE user_id = $1
525 AND ($2 = ANY(scope_list) OR $3 = ANY(scope_list))
526 AND finished_at IS NULL
527 ",
528 Uuid::from(user.id),
529 stable_device_as_scope_token.as_str(),
530 unstable_device_as_scope_token.as_str(),
531 finished_at
532 )
533 .record(&span)
534 .execute(&mut *self.conn)
535 .instrument(span)
536 .await?
537 .rows_affected();
538 affected |= oauth2_affected > 0;
539 }
540
541 Ok(affected)
542 }
543}
544
545#[cfg(test)]
546mod tests {
547 use chrono::Duration;
548 use mas_data_model::{Device, clock::MockClock};
549 use mas_storage::{
550 Pagination, RepositoryAccess,
551 app_session::{AppSession, AppSessionFilter},
552 oauth2::OAuth2SessionRepository,
553 };
554 use oauth2_types::{
555 requests::GrantType,
556 scope::{OPENID, Scope},
557 };
558 use rand::SeedableRng;
559 use rand_chacha::ChaChaRng;
560 use sqlx::PgPool;
561
562 use crate::PgRepository;
563
564 #[sqlx::test(migrator = "crate::MIGRATOR")]
565 async fn test_app_repo(pool: PgPool) {
566 let mut rng = ChaChaRng::seed_from_u64(42);
567 let clock = MockClock::default();
568 let mut repo = PgRepository::from_pool(&pool).await.unwrap();
569
570 let user = repo
572 .user()
573 .add(&mut rng, &clock, "john".to_owned())
574 .await
575 .unwrap();
576
577 let all = AppSessionFilter::new().for_user(&user);
578 let active = all.active_only();
579 let finished = all.finished_only();
580 let pagination = Pagination::first(10);
581
582 assert_eq!(repo.app_session().count(all).await.unwrap(), 0);
583 assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
584 assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
585
586 let full_list = repo.app_session().list(all, pagination).await.unwrap();
587 assert!(full_list.edges.is_empty());
588 let active_list = repo.app_session().list(active, pagination).await.unwrap();
589 assert!(active_list.edges.is_empty());
590 let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
591 assert!(finished_list.edges.is_empty());
592
593 let device = Device::generate(&mut rng);
595 let compat_session = repo
596 .compat_session()
597 .add(&mut rng, &clock, &user, device.clone(), None, false, None)
598 .await
599 .unwrap();
600
601 assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
602 assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
603 assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
604
605 let full_list = repo.app_session().list(all, pagination).await.unwrap();
606 assert_eq!(full_list.edges.len(), 1);
607 assert_eq!(
608 full_list.edges[0].node,
609 AppSession::Compat(Box::new(compat_session.clone()))
610 );
611 let active_list = repo.app_session().list(active, pagination).await.unwrap();
612 assert_eq!(active_list.edges.len(), 1);
613 assert_eq!(
614 active_list.edges[0].node,
615 AppSession::Compat(Box::new(compat_session.clone()))
616 );
617 let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
618 assert!(finished_list.edges.is_empty());
619
620 let compat_session = repo
622 .compat_session()
623 .finish(&clock, compat_session)
624 .await
625 .unwrap();
626
627 assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
628 assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
629 assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
630
631 let full_list = repo.app_session().list(all, pagination).await.unwrap();
632 assert_eq!(full_list.edges.len(), 1);
633 assert_eq!(
634 full_list.edges[0].node,
635 AppSession::Compat(Box::new(compat_session.clone()))
636 );
637 let active_list = repo.app_session().list(active, pagination).await.unwrap();
638 assert!(active_list.edges.is_empty());
639 let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
640 assert_eq!(finished_list.edges.len(), 1);
641 assert_eq!(
642 finished_list.edges[0].node,
643 AppSession::Compat(Box::new(compat_session.clone()))
644 );
645
646 let client = repo
648 .oauth2_client()
649 .add(
650 &mut rng,
651 &clock,
652 vec!["https://example.com/redirect".parse().unwrap()],
653 None,
654 None,
655 None,
656 vec![GrantType::AuthorizationCode],
657 Some("First client".to_owned()),
658 Some("https://example.com/logo.png".parse().unwrap()),
659 Some("https://example.com/".parse().unwrap()),
660 Some("https://example.com/policy".parse().unwrap()),
661 Some("https://example.com/tos".parse().unwrap()),
662 Some("https://example.com/jwks.json".parse().unwrap()),
663 None,
664 None,
665 None,
666 None,
667 None,
668 Some("https://example.com/login".parse().unwrap()),
669 )
670 .await
671 .unwrap();
672
673 let device2 = Device::generate(&mut rng);
674 let scope: Scope = [OPENID]
675 .into_iter()
676 .chain(device2.to_scope_token().unwrap().into_iter())
677 .collect();
678
679 clock.advance(Duration::try_minutes(1).unwrap());
682
683 let oauth_session = repo
684 .oauth2_session()
685 .add(&mut rng, &clock, &client, Some(&user), None, scope)
686 .await
687 .unwrap();
688
689 assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
690 assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
691 assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
692
693 let full_list = repo.app_session().list(all, pagination).await.unwrap();
694 assert_eq!(full_list.edges.len(), 2);
695 assert_eq!(
696 full_list.edges[0].node,
697 AppSession::Compat(Box::new(compat_session.clone()))
698 );
699 assert_eq!(
700 full_list.edges[1].node,
701 AppSession::OAuth2(Box::new(oauth_session.clone()))
702 );
703
704 let active_list = repo.app_session().list(active, pagination).await.unwrap();
705 assert_eq!(active_list.edges.len(), 1);
706 assert_eq!(
707 active_list.edges[0].node,
708 AppSession::OAuth2(Box::new(oauth_session.clone()))
709 );
710
711 let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
712 assert_eq!(finished_list.edges.len(), 1);
713 assert_eq!(
714 finished_list.edges[0].node,
715 AppSession::Compat(Box::new(compat_session.clone()))
716 );
717
718 let oauth_session = repo
720 .oauth2_session()
721 .finish(&clock, oauth_session)
722 .await
723 .unwrap();
724
725 assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
726 assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
727 assert_eq!(repo.app_session().count(finished).await.unwrap(), 2);
728
729 let full_list = repo.app_session().list(all, pagination).await.unwrap();
730 assert_eq!(full_list.edges.len(), 2);
731 assert_eq!(
732 full_list.edges[0].node,
733 AppSession::Compat(Box::new(compat_session.clone()))
734 );
735 assert_eq!(
736 full_list.edges[1].node,
737 AppSession::OAuth2(Box::new(oauth_session.clone()))
738 );
739
740 let active_list = repo.app_session().list(active, pagination).await.unwrap();
741 assert!(active_list.edges.is_empty());
742
743 let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
744 assert_eq!(finished_list.edges.len(), 2);
745 assert_eq!(
746 finished_list.edges[0].node,
747 AppSession::Compat(Box::new(compat_session.clone()))
748 );
749 assert_eq!(
750 full_list.edges[1].node,
751 AppSession::OAuth2(Box::new(oauth_session.clone()))
752 );
753
754 let filter = AppSessionFilter::new().for_device(&device);
756 assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
757 let list = repo.app_session().list(filter, pagination).await.unwrap();
758 assert_eq!(list.edges.len(), 1);
759 assert_eq!(
760 list.edges[0].node,
761 AppSession::Compat(Box::new(compat_session.clone()))
762 );
763
764 let filter = AppSessionFilter::new().for_device(&device2);
765 assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
766 let list = repo.app_session().list(filter, pagination).await.unwrap();
767 assert_eq!(list.edges.len(), 1);
768 assert_eq!(
769 list.edges[0].node,
770 AppSession::OAuth2(Box::new(oauth_session.clone()))
771 );
772
773 let user2 = repo
775 .user()
776 .add(&mut rng, &clock, "alice".to_owned())
777 .await
778 .unwrap();
779
780 let filter = AppSessionFilter::new().for_user(&user2);
782 assert_eq!(repo.app_session().count(filter).await.unwrap(), 0);
783 let list = repo.app_session().list(filter, pagination).await.unwrap();
784 assert!(list.edges.is_empty());
785 }
786}