mas_tasks/
email.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use async_trait::async_trait;
8use chrono::Duration;
9use mas_email::{Address, EmailVerificationContext, Mailbox};
10use mas_storage::queue::{SendEmailAuthenticationCodeJob, VerifyEmailJob};
11use mas_templates::TemplateContext as _;
12use rand::{Rng, distributions::Uniform};
13use tracing::info;
14
15use crate::{
16    State,
17    new_queue::{JobContext, JobError, RunnableJob},
18};
19
20#[async_trait]
21impl RunnableJob for VerifyEmailJob {
22    #[tracing::instrument(
23        name = "job.verify_email",
24        fields(user_email.id = %self.user_email_id()),
25        skip_all,
26        err,
27    )]
28    async fn run(&self, _state: &State, _context: JobContext) -> Result<(), JobError> {
29        // This job was for the old email verification flow, which has been replaced.
30        // We still want to consume existing jobs in the queue, so we just make them
31        // permanently fail.
32        Err(JobError::fail(anyhow::anyhow!("Not implemented")))
33    }
34}
35
36#[async_trait]
37impl RunnableJob for SendEmailAuthenticationCodeJob {
38    #[tracing::instrument(
39        name = "job.send_email_authentication_code",
40        fields(user_email_authentication.id = %self.user_email_authentication_id()),
41        skip_all,
42        err,
43    )]
44    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
45        let clock = state.clock();
46        let mailer = state.mailer();
47        let mut rng = state.rng();
48        let mut repo = state.repository().await.map_err(JobError::retry)?;
49
50        let user_email_authentication = repo
51            .user_email()
52            .lookup_authentication(self.user_email_authentication_id())
53            .await
54            .map_err(JobError::retry)?
55            .ok_or(JobError::fail(anyhow::anyhow!(
56                "User email authentication not found"
57            )))?;
58
59        if user_email_authentication.completed_at.is_some() {
60            return Err(JobError::fail(anyhow::anyhow!(
61                "User email authentication already completed"
62            )));
63        }
64
65        // Load the browser session, if any
66        let browser_session =
67            if let Some(browser_session) = user_email_authentication.user_session_id {
68                Some(
69                    repo.browser_session()
70                        .lookup(browser_session)
71                        .await
72                        .map_err(JobError::retry)?
73                        .ok_or(JobError::fail(anyhow::anyhow!(
74                            "Failed to load browser session"
75                        )))?,
76                )
77            } else {
78                None
79            };
80
81        // Load the registration, if any
82        let registration =
83            if let Some(registration_id) = user_email_authentication.user_registration_id {
84                Some(
85                    repo.user_registration()
86                        .lookup(registration_id)
87                        .await
88                        .map_err(JobError::retry)?
89                        .ok_or(JobError::fail(anyhow::anyhow!(
90                            "Failed to load user registration"
91                        )))?,
92                )
93            } else {
94                None
95            };
96
97        // Generate a new 6-digit authentication code
98        let range = Uniform::<u32>::from(0..1_000_000);
99        let code = rng.sample(range);
100        let code = format!("{code:06}");
101        let code = repo
102            .user_email()
103            .add_authentication_code(
104                &mut rng,
105                &clock,
106                Duration::minutes(5), // TODO: make this configurable
107                &user_email_authentication,
108                code,
109            )
110            .await
111            .map_err(JobError::retry)?;
112
113        let address: Address = user_email_authentication
114            .email
115            .parse()
116            .map_err(JobError::fail)?;
117        let username_from_session = browser_session.as_ref().map(|s| s.user.username.clone());
118        let username_from_registration = registration.as_ref().map(|r| r.username.clone());
119        let username = username_from_registration.or(username_from_session);
120        let mailbox = Mailbox::new(username, address);
121
122        info!("Sending email verification code to {}", mailbox);
123
124        let language = self.language().parse().map_err(JobError::fail)?;
125
126        let context = EmailVerificationContext::new(code, browser_session, registration)
127            .with_language(language);
128        mailer
129            .send_verification_email(mailbox, &context)
130            .await
131            .map_err(JobError::fail)?;
132
133        repo.save().await.map_err(JobError::fail)?;
134
135        Ok(())
136    }
137}