mas_handlers/graphql/mutations/
user_email.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use anyhow::Context as _;
8use async_graphql::{Context, Description, Enum, ID, InputObject, Object};
9use mas_i18n::DataLocale;
10use mas_storage::{
11    RepositoryAccess,
12    queue::{ProvisionUserJob, QueueJobRepositoryExt as _, SendEmailAuthenticationCodeJob},
13    user::{UserEmailFilter, UserEmailRepository, UserRepository},
14};
15
16use super::verify_password_if_needed;
17use crate::graphql::{
18    model::{NodeType, User, UserEmail, UserEmailAuthentication},
19    state::ContextExt,
20};
21
22#[derive(Default)]
23pub struct UserEmailMutations {
24    _private: (),
25}
26
27/// The input for the `addEmail` mutation
28#[derive(InputObject)]
29struct AddEmailInput {
30    /// The email address to add
31    email: String,
32
33    /// The ID of the user to add the email address to
34    user_id: ID,
35
36    /// Skip the email address verification. Only allowed for admins.
37    skip_verification: Option<bool>,
38
39    /// Skip the email address policy check. Only allowed for admins.
40    skip_policy_check: Option<bool>,
41}
42
43/// The status of the `addEmail` mutation
44#[derive(Enum, Copy, Clone, Eq, PartialEq)]
45pub enum AddEmailStatus {
46    /// The email address was added
47    Added,
48    /// The email address already exists
49    Exists,
50    /// The email address is invalid
51    Invalid,
52    /// The email address is not allowed by the policy
53    Denied,
54}
55
56/// The payload of the `addEmail` mutation
57#[derive(Description)]
58enum AddEmailPayload {
59    Added(mas_data_model::UserEmail),
60    Exists(mas_data_model::UserEmail),
61    Invalid,
62    Denied {
63        violations: Vec<mas_policy::Violation>,
64    },
65}
66
67#[Object(use_type_description)]
68impl AddEmailPayload {
69    /// Status of the operation
70    async fn status(&self) -> AddEmailStatus {
71        match self {
72            AddEmailPayload::Added(_) => AddEmailStatus::Added,
73            AddEmailPayload::Exists(_) => AddEmailStatus::Exists,
74            AddEmailPayload::Invalid => AddEmailStatus::Invalid,
75            AddEmailPayload::Denied { .. } => AddEmailStatus::Denied,
76        }
77    }
78
79    /// The email address that was added
80    async fn email(&self) -> Option<UserEmail> {
81        match self {
82            AddEmailPayload::Added(email) | AddEmailPayload::Exists(email) => {
83                Some(UserEmail(email.clone()))
84            }
85            AddEmailPayload::Invalid | AddEmailPayload::Denied { .. } => None,
86        }
87    }
88
89    /// The user to whom the email address was added
90    async fn user(&self, ctx: &Context<'_>) -> Result<Option<User>, async_graphql::Error> {
91        let state = ctx.state();
92        let mut repo = state.repository().await?;
93
94        let user_id = match self {
95            AddEmailPayload::Added(email) | AddEmailPayload::Exists(email) => email.user_id,
96            AddEmailPayload::Invalid | AddEmailPayload::Denied { .. } => return Ok(None),
97        };
98
99        let user = repo
100            .user()
101            .lookup(user_id)
102            .await?
103            .context("User not found")?;
104
105        Ok(Some(User(user)))
106    }
107
108    /// The list of policy violations if the email address was denied
109    async fn violations(&self) -> Option<Vec<String>> {
110        let AddEmailPayload::Denied { violations } = self else {
111            return None;
112        };
113
114        let messages = violations.iter().map(|v| v.msg.clone()).collect();
115        Some(messages)
116    }
117}
118
119/// The input for the `removeEmail` mutation
120#[derive(InputObject)]
121struct RemoveEmailInput {
122    /// The ID of the email address to remove
123    user_email_id: ID,
124
125    /// The user's current password. This is required if the user is not an
126    /// admin and it has a password on its account.
127    password: Option<String>,
128}
129
130/// The status of the `removeEmail` mutation
131#[derive(Enum, Copy, Clone, Eq, PartialEq)]
132enum RemoveEmailStatus {
133    /// The email address was removed
134    Removed,
135
136    /// The email address was not found
137    NotFound,
138
139    /// The password provided is incorrect
140    IncorrectPassword,
141}
142
143/// The payload of the `removeEmail` mutation
144#[derive(Description)]
145enum RemoveEmailPayload {
146    Removed(mas_data_model::UserEmail),
147    NotFound,
148    IncorrectPassword,
149}
150
151#[Object(use_type_description)]
152impl RemoveEmailPayload {
153    /// Status of the operation
154    async fn status(&self) -> RemoveEmailStatus {
155        match self {
156            RemoveEmailPayload::Removed(_) => RemoveEmailStatus::Removed,
157            RemoveEmailPayload::NotFound => RemoveEmailStatus::NotFound,
158            RemoveEmailPayload::IncorrectPassword => RemoveEmailStatus::IncorrectPassword,
159        }
160    }
161
162    /// The email address that was removed
163    async fn email(&self) -> Option<UserEmail> {
164        match self {
165            RemoveEmailPayload::Removed(email) => Some(UserEmail(email.clone())),
166            RemoveEmailPayload::NotFound | RemoveEmailPayload::IncorrectPassword => None,
167        }
168    }
169
170    /// The user to whom the email address belonged
171    async fn user(&self, ctx: &Context<'_>) -> Result<Option<User>, async_graphql::Error> {
172        let state = ctx.state();
173
174        let user_id = match self {
175            RemoveEmailPayload::Removed(email) => email.user_id,
176            RemoveEmailPayload::NotFound | RemoveEmailPayload::IncorrectPassword => {
177                return Ok(None);
178            }
179        };
180
181        let mut repo = state.repository().await?;
182
183        let user = repo
184            .user()
185            .lookup(user_id)
186            .await?
187            .context("User not found")?;
188
189        Ok(Some(User(user)))
190    }
191}
192
193/// The input for the `setPrimaryEmail` mutation
194#[derive(InputObject)]
195struct SetPrimaryEmailInput {
196    /// The ID of the email address to set as primary
197    user_email_id: ID,
198}
199
200/// The status of the `setPrimaryEmail` mutation
201#[derive(Enum, Copy, Clone, Eq, PartialEq)]
202enum SetPrimaryEmailStatus {
203    /// The email address was set as primary
204    Set,
205    /// The email address was not found
206    NotFound,
207    /// Can't make an unverified email address primary
208    Unverified,
209}
210
211/// The payload of the `setPrimaryEmail` mutation
212#[derive(Description)]
213enum SetPrimaryEmailPayload {
214    Set(mas_data_model::User),
215    NotFound,
216}
217
218#[Object(use_type_description)]
219impl SetPrimaryEmailPayload {
220    async fn status(&self) -> SetPrimaryEmailStatus {
221        match self {
222            SetPrimaryEmailPayload::Set(_) => SetPrimaryEmailStatus::Set,
223            SetPrimaryEmailPayload::NotFound => SetPrimaryEmailStatus::NotFound,
224        }
225    }
226
227    /// The user to whom the email address belongs
228    async fn user(&self) -> Option<User> {
229        match self {
230            SetPrimaryEmailPayload::Set(user) => Some(User(user.clone())),
231            SetPrimaryEmailPayload::NotFound => None,
232        }
233    }
234}
235
236/// The input for the `startEmailAuthentication` mutation
237#[derive(InputObject)]
238struct StartEmailAuthenticationInput {
239    /// The email address to add to the account
240    email: String,
241
242    /// The user's current password. This is required if the user has a password
243    /// on its account.
244    password: Option<String>,
245
246    /// The language to use for the email
247    #[graphql(default = "en")]
248    language: String,
249}
250
251/// The status of the `startEmailAuthentication` mutation
252#[derive(Enum, Copy, Clone, Eq, PartialEq)]
253enum StartEmailAuthenticationStatus {
254    /// The email address was started
255    Started,
256    /// The email address is invalid
257    InvalidEmailAddress,
258    /// Too many attempts to start an email authentication
259    RateLimited,
260    /// The email address isn't allowed by the policy
261    Denied,
262    /// The email address is already in use on this account
263    InUse,
264    /// The password provided is incorrect
265    IncorrectPassword,
266}
267
268/// The payload of the `startEmailAuthentication` mutation
269#[derive(Description)]
270enum StartEmailAuthenticationPayload {
271    Started(UserEmailAuthentication),
272    InvalidEmailAddress,
273    RateLimited,
274    Denied {
275        violations: Vec<mas_policy::Violation>,
276    },
277    InUse,
278    IncorrectPassword,
279}
280
281#[Object(use_type_description)]
282impl StartEmailAuthenticationPayload {
283    /// Status of the operation
284    async fn status(&self) -> StartEmailAuthenticationStatus {
285        match self {
286            Self::Started(_) => StartEmailAuthenticationStatus::Started,
287            Self::InvalidEmailAddress => StartEmailAuthenticationStatus::InvalidEmailAddress,
288            Self::RateLimited => StartEmailAuthenticationStatus::RateLimited,
289            Self::Denied { .. } => StartEmailAuthenticationStatus::Denied,
290            Self::InUse => StartEmailAuthenticationStatus::InUse,
291            Self::IncorrectPassword => StartEmailAuthenticationStatus::IncorrectPassword,
292        }
293    }
294
295    /// The email authentication session that was started
296    async fn authentication(&self) -> Option<&UserEmailAuthentication> {
297        match self {
298            Self::Started(authentication) => Some(authentication),
299            Self::InvalidEmailAddress
300            | Self::RateLimited
301            | Self::Denied { .. }
302            | Self::InUse
303            | Self::IncorrectPassword => None,
304        }
305    }
306
307    /// The list of policy violations if the email address was denied
308    async fn violations(&self) -> Option<Vec<String>> {
309        let Self::Denied { violations } = self else {
310            return None;
311        };
312
313        let messages = violations.iter().map(|v| v.msg.clone()).collect();
314        Some(messages)
315    }
316}
317
318/// The input for the `completeEmailAuthentication` mutation
319#[derive(InputObject)]
320struct CompleteEmailAuthenticationInput {
321    /// The authentication code to use
322    code: String,
323
324    /// The ID of the authentication session to complete
325    id: ID,
326}
327
328/// The payload of the `completeEmailAuthentication` mutation
329#[derive(Description)]
330enum CompleteEmailAuthenticationPayload {
331    Completed,
332    InvalidCode,
333    CodeExpired,
334    InUse,
335    RateLimited,
336}
337
338/// The status of the `completeEmailAuthentication` mutation
339#[derive(Enum, Copy, Clone, Eq, PartialEq)]
340enum CompleteEmailAuthenticationStatus {
341    /// The authentication was completed
342    Completed,
343    /// The authentication code is invalid
344    InvalidCode,
345    /// The authentication code has expired
346    CodeExpired,
347    /// Too many attempts to complete an email authentication
348    RateLimited,
349    /// The email address is already in use
350    InUse,
351}
352
353#[Object(use_type_description)]
354impl CompleteEmailAuthenticationPayload {
355    /// Status of the operation
356    async fn status(&self) -> CompleteEmailAuthenticationStatus {
357        match self {
358            Self::Completed => CompleteEmailAuthenticationStatus::Completed,
359            Self::InvalidCode => CompleteEmailAuthenticationStatus::InvalidCode,
360            Self::CodeExpired => CompleteEmailAuthenticationStatus::CodeExpired,
361            Self::InUse => CompleteEmailAuthenticationStatus::InUse,
362            Self::RateLimited => CompleteEmailAuthenticationStatus::RateLimited,
363        }
364    }
365}
366
367/// The input for the `resendEmailAuthenticationCode` mutation
368#[derive(InputObject)]
369struct ResendEmailAuthenticationCodeInput {
370    /// The ID of the authentication session to resend the code for
371    id: ID,
372
373    /// The language to use for the email
374    #[graphql(default = "en")]
375    language: String,
376}
377
378/// The payload of the `resendEmailAuthenticationCode` mutation
379#[derive(Description)]
380enum ResendEmailAuthenticationCodePayload {
381    /// The email was resent
382    Resent,
383    /// The email authentication session is already completed
384    Completed,
385    /// Too many attempts to resend an email authentication code
386    RateLimited,
387}
388
389/// The status of the `resendEmailAuthenticationCode` mutation
390#[derive(Enum, Copy, Clone, Eq, PartialEq)]
391enum ResendEmailAuthenticationCodeStatus {
392    /// The email was resent
393    Resent,
394    /// The email authentication session is already completed
395    Completed,
396    /// Too many attempts to resend an email authentication code
397    RateLimited,
398}
399
400#[Object(use_type_description)]
401impl ResendEmailAuthenticationCodePayload {
402    /// Status of the operation
403    async fn status(&self) -> ResendEmailAuthenticationCodeStatus {
404        match self {
405            Self::Resent => ResendEmailAuthenticationCodeStatus::Resent,
406            Self::Completed => ResendEmailAuthenticationCodeStatus::Completed,
407            Self::RateLimited => ResendEmailAuthenticationCodeStatus::RateLimited,
408        }
409    }
410}
411
412#[Object]
413impl UserEmailMutations {
414    /// Add an email address to the specified user
415    #[graphql(deprecation = "Use `startEmailAuthentication` instead.")]
416    async fn add_email(
417        &self,
418        ctx: &Context<'_>,
419        input: AddEmailInput,
420    ) -> Result<AddEmailPayload, async_graphql::Error> {
421        let state = ctx.state();
422        let id = NodeType::User.extract_ulid(&input.user_id)?;
423        let requester = ctx.requester();
424        let clock = state.clock();
425        let mut rng = state.rng();
426
427        // Only allow admin to call this mutation
428        if !requester.is_admin() {
429            return Err(async_graphql::Error::new("Unauthorized"));
430        }
431
432        let _skip_verification = input.skip_verification.unwrap_or(false);
433        let skip_policy_check = input.skip_policy_check.unwrap_or(false);
434
435        let mut repo = state.repository().await?;
436
437        let user = repo
438            .user()
439            .lookup(id)
440            .await?
441            .context("Failed to load user")?;
442
443        // Validate the email address
444        if input.email.parse::<lettre::Address>().is_err() {
445            return Ok(AddEmailPayload::Invalid);
446        }
447
448        if !skip_policy_check {
449            let mut policy = state.policy().await?;
450            let res = policy
451                .evaluate_email(mas_policy::EmailInput {
452                    email: &input.email,
453                    requester: requester.for_policy(),
454                })
455                .await?;
456            if !res.valid() {
457                return Ok(AddEmailPayload::Denied {
458                    violations: res.violations,
459                });
460            }
461        }
462
463        // Find an existing email address
464        let existing_user_email = repo.user_email().find(&user, &input.email).await?;
465        let (added, user_email) = if let Some(user_email) = existing_user_email {
466            (false, user_email)
467        } else {
468            let user_email = repo
469                .user_email()
470                .add(&mut rng, &clock, &user, input.email)
471                .await?;
472
473            (true, user_email)
474        };
475
476        repo.save().await?;
477
478        let payload = if added {
479            AddEmailPayload::Added(user_email)
480        } else {
481            AddEmailPayload::Exists(user_email)
482        };
483        Ok(payload)
484    }
485
486    /// Remove an email address
487    async fn remove_email(
488        &self,
489        ctx: &Context<'_>,
490        input: RemoveEmailInput,
491    ) -> Result<RemoveEmailPayload, async_graphql::Error> {
492        let state = ctx.state();
493        let user_email_id = NodeType::UserEmail.extract_ulid(&input.user_email_id)?;
494        let requester = ctx.requester();
495
496        let mut rng = state.rng();
497        let clock = state.clock();
498        let mut repo = state.repository().await?;
499
500        let user_email = repo.user_email().lookup(user_email_id).await?;
501        let Some(user_email) = user_email else {
502            return Ok(RemoveEmailPayload::NotFound);
503        };
504
505        if !requester.is_owner_or_admin(&user_email) {
506            return Ok(RemoveEmailPayload::NotFound);
507        }
508
509        // Allow non-admins to remove their email address if the site config allows it
510        if !requester.is_admin() && !state.site_config().email_change_allowed {
511            return Err(async_graphql::Error::new("Unauthorized"));
512        }
513
514        let user = repo
515            .user()
516            .lookup(user_email.user_id)
517            .await?
518            .context("Failed to load user")?;
519
520        // Validate the password input if needed
521        if !verify_password_if_needed(
522            requester,
523            state.site_config(),
524            &state.password_manager(),
525            input.password,
526            &user,
527            &mut repo,
528        )
529        .await?
530        {
531            return Ok(RemoveEmailPayload::IncorrectPassword);
532        }
533
534        // TODO: don't allow removing the last email address
535
536        repo.user_email().remove(user_email.clone()).await?;
537
538        // Schedule a job to update the user
539        repo.queue_job()
540            .schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user))
541            .await?;
542
543        repo.save().await?;
544
545        Ok(RemoveEmailPayload::Removed(user_email))
546    }
547
548    /// Set an email address as primary
549    #[graphql(
550        deprecation = "This doesn't do anything anymore, but is kept to avoid breaking existing queries"
551    )]
552    async fn set_primary_email(
553        &self,
554        ctx: &Context<'_>,
555        input: SetPrimaryEmailInput,
556    ) -> Result<SetPrimaryEmailPayload, async_graphql::Error> {
557        let state = ctx.state();
558        let user_email_id = NodeType::UserEmail.extract_ulid(&input.user_email_id)?;
559        let requester = ctx.requester();
560
561        let mut repo = state.repository().await?;
562
563        let user_email = repo.user_email().lookup(user_email_id).await?;
564        let Some(user_email) = user_email else {
565            return Ok(SetPrimaryEmailPayload::NotFound);
566        };
567
568        if !requester.is_owner_or_admin(&user_email) {
569            return Err(async_graphql::Error::new("Unauthorized"));
570        }
571
572        // Allow non-admins to change their primary email address if the site config
573        // allows it
574        if !requester.is_admin() && !state.site_config().email_change_allowed {
575            return Err(async_graphql::Error::new("Unauthorized"));
576        }
577
578        // The user primary email should already be up to date
579        let user = repo
580            .user()
581            .lookup(user_email.user_id)
582            .await?
583            .context("Failed to load user")?;
584
585        repo.save().await?;
586
587        Ok(SetPrimaryEmailPayload::Set(user))
588    }
589
590    /// Start a new email authentication flow
591    async fn start_email_authentication(
592        &self,
593        ctx: &Context<'_>,
594        input: StartEmailAuthenticationInput,
595    ) -> Result<StartEmailAuthenticationPayload, async_graphql::Error> {
596        let state = ctx.state();
597        let mut rng = state.rng();
598        let clock = state.clock();
599        let requester = ctx.requester();
600        let limiter = state.limiter();
601
602        // Only allow calling this if the requester is a browser session
603        let Some(browser_session) = requester.browser_session() else {
604            return Err(async_graphql::Error::new("Unauthorized"));
605        };
606
607        // Allow to starting the email authentication flow if the site config allows it
608        if !state.site_config().email_change_allowed {
609            return Err(async_graphql::Error::new(
610                "Email changes are not allowed on this server",
611            ));
612        }
613
614        if !state.site_config().email_change_allowed {
615            return Err(async_graphql::Error::new(
616                "Email authentication is not allowed on this server",
617            ));
618        }
619
620        // Check if the locale is valid
621        let _: DataLocale = input.language.parse()?;
622
623        // Check if the email address is valid
624        if input.email.parse::<lettre::Address>().is_err() {
625            return Ok(StartEmailAuthenticationPayload::InvalidEmailAddress);
626        }
627
628        if let Err(e) =
629            limiter.check_email_authentication_email(requester.fingerprint(), &input.email)
630        {
631            tracing::warn!(error = &e as &dyn std::error::Error);
632            return Ok(StartEmailAuthenticationPayload::RateLimited);
633        }
634
635        let mut repo = state.repository().await?;
636
637        // Check if the email address is already in use by the same user
638        // We don't report here if the email address is already in use by another user,
639        // because we don't want to leak information about other users. We will do that
640        // only when the user enters the right verification code
641        let count = repo
642            .user_email()
643            .count(
644                UserEmailFilter::new()
645                    .for_email(&input.email)
646                    .for_user(&browser_session.user),
647            )
648            .await?;
649        if count > 0 {
650            return Ok(StartEmailAuthenticationPayload::InUse);
651        }
652
653        // Check if the email address is allowed by the policy
654        let mut policy = state.policy().await?;
655        let res = policy
656            .evaluate_email(mas_policy::EmailInput {
657                email: &input.email,
658                requester: requester.for_policy(),
659            })
660            .await?;
661        if !res.valid() {
662            return Ok(StartEmailAuthenticationPayload::Denied {
663                violations: res.violations,
664            });
665        }
666
667        // Validate the password input if needed
668        if !verify_password_if_needed(
669            requester,
670            state.site_config(),
671            &state.password_manager(),
672            input.password,
673            &browser_session.user,
674            &mut repo,
675        )
676        .await?
677        {
678            return Ok(StartEmailAuthenticationPayload::IncorrectPassword);
679        }
680
681        // Create a new authentication session
682        let authentication = repo
683            .user_email()
684            .add_authentication_for_session(&mut rng, &clock, input.email, browser_session)
685            .await?;
686
687        repo.queue_job()
688            .schedule_job(
689                &mut rng,
690                &clock,
691                SendEmailAuthenticationCodeJob::new(&authentication, input.language),
692            )
693            .await?;
694
695        repo.save().await?;
696
697        Ok(StartEmailAuthenticationPayload::Started(
698            UserEmailAuthentication(authentication),
699        ))
700    }
701
702    /// Resend the email authentication code
703    async fn resend_email_authentication_code(
704        &self,
705        ctx: &Context<'_>,
706        input: ResendEmailAuthenticationCodeInput,
707    ) -> Result<ResendEmailAuthenticationCodePayload, async_graphql::Error> {
708        let state = ctx.state();
709        let mut rng = state.rng();
710        let clock = state.clock();
711        let limiter = state.limiter();
712        let requester = ctx.requester();
713
714        let id = NodeType::UserEmailAuthentication.extract_ulid(&input.id)?;
715        let Some(browser_session) = requester.browser_session() else {
716            return Err(async_graphql::Error::new("Unauthorized"));
717        };
718
719        // Allow to completing the email authentication flow if the site config allows
720        // it
721        if !state.site_config().email_change_allowed {
722            return Err(async_graphql::Error::new(
723                "Email changes are not allowed on this server",
724            ));
725        }
726
727        // Check if the locale is valid
728        let _: DataLocale = input.language.parse()?;
729
730        let mut repo = state.repository().await?;
731
732        let Some(authentication) = repo.user_email().lookup_authentication(id).await? else {
733            return Ok(ResendEmailAuthenticationCodePayload::Completed);
734        };
735
736        // Make sure this authentication belongs to the requester
737        if authentication.user_session_id != Some(browser_session.id) {
738            return Err(async_graphql::Error::new("Unauthorized"));
739        }
740
741        if authentication.completed_at.is_some() {
742            return Ok(ResendEmailAuthenticationCodePayload::Completed);
743        }
744
745        if let Err(e) =
746            limiter.check_email_authentication_send_code(requester.fingerprint(), &authentication)
747        {
748            tracing::warn!(error = &e as &dyn std::error::Error);
749            return Ok(ResendEmailAuthenticationCodePayload::RateLimited);
750        }
751
752        repo.queue_job()
753            .schedule_job(
754                &mut rng,
755                &clock,
756                SendEmailAuthenticationCodeJob::new(&authentication, input.language),
757            )
758            .await?;
759
760        repo.save().await?;
761
762        Ok(ResendEmailAuthenticationCodePayload::Resent)
763    }
764
765    /// Complete the email authentication flow
766    async fn complete_email_authentication(
767        &self,
768        ctx: &Context<'_>,
769        input: CompleteEmailAuthenticationInput,
770    ) -> Result<CompleteEmailAuthenticationPayload, async_graphql::Error> {
771        let state = ctx.state();
772        let mut rng = state.rng();
773        let clock = state.clock();
774        let limiter = state.limiter();
775
776        let id = NodeType::UserEmailAuthentication.extract_ulid(&input.id)?;
777
778        let Some(browser_session) = ctx.requester().browser_session() else {
779            return Err(async_graphql::Error::new("Unauthorized"));
780        };
781
782        // Allow to completing the email authentication flow if the site config allows
783        // it
784        if !state.site_config().email_change_allowed {
785            return Err(async_graphql::Error::new(
786                "Email changes are not allowed on this server",
787            ));
788        }
789
790        let mut repo = state.repository().await?;
791
792        let Some(authentication) = repo.user_email().lookup_authentication(id).await? else {
793            return Ok(CompleteEmailAuthenticationPayload::InvalidCode);
794        };
795
796        // Make sure this authentication belongs to the requester
797        if authentication.user_session_id != Some(browser_session.id) {
798            return Ok(CompleteEmailAuthenticationPayload::InvalidCode);
799        }
800
801        if let Err(e) = limiter.check_email_authentication_attempt(&authentication) {
802            tracing::warn!(error = &e as &dyn std::error::Error);
803            return Ok(CompleteEmailAuthenticationPayload::RateLimited);
804        }
805
806        let Some(code) = repo
807            .user_email()
808            .find_authentication_code(&authentication, &input.code)
809            .await?
810        else {
811            return Ok(CompleteEmailAuthenticationPayload::InvalidCode);
812        };
813
814        if code.expires_at < state.clock().now() {
815            return Ok(CompleteEmailAuthenticationPayload::CodeExpired);
816        }
817
818        let authentication = repo
819            .user_email()
820            .complete_authentication(&clock, authentication, &code)
821            .await?;
822
823        // Check the email is not already in use by anyone, including the current user
824        let count = repo
825            .user_email()
826            .count(UserEmailFilter::new().for_email(&authentication.email))
827            .await?;
828
829        if count > 0 {
830            // We still want to consume the code so that it can't be reused
831            repo.save().await?;
832
833            return Ok(CompleteEmailAuthenticationPayload::InUse);
834        }
835
836        repo.user_email()
837            .add(
838                &mut rng,
839                &clock,
840                &browser_session.user,
841                authentication.email,
842            )
843            .await?;
844
845        repo.save().await?;
846
847        Ok(CompleteEmailAuthenticationPayload::Completed)
848    }
849}