mas_tasks/
recovery.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use anyhow::Context;
8use async_trait::async_trait;
9use mas_email::{Address, Mailbox};
10use mas_i18n::DataLocale;
11use mas_storage::{
12    Pagination, RepositoryAccess,
13    queue::SendAccountRecoveryEmailsJob,
14    user::{UserEmailFilter, UserRecoveryRepository},
15};
16use mas_templates::{EmailRecoveryContext, TemplateContext};
17use rand::distributions::{Alphanumeric, DistString};
18use tracing::{error, info};
19
20use crate::{
21    State,
22    new_queue::{JobContext, JobError, RunnableJob},
23};
24
25/// Job to send account recovery emails for a given recovery session.
26#[async_trait]
27impl RunnableJob for SendAccountRecoveryEmailsJob {
28    #[tracing::instrument(
29        name = "job.send_account_recovery_email",
30        fields(
31            user_recovery_session.id = %self.user_recovery_session_id(),
32            user_recovery_session.email,
33        ),
34        skip_all,
35        err,
36    )]
37    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
38        let clock = state.clock();
39        let mailer = state.mailer();
40        let url_builder = state.url_builder();
41        let mut rng = state.rng();
42        let mut repo = state.repository().await.map_err(JobError::retry)?;
43
44        let session = repo
45            .user_recovery()
46            .lookup_session(self.user_recovery_session_id())
47            .await
48            .map_err(JobError::retry)?
49            .context("User recovery session not found")
50            .map_err(JobError::fail)?;
51
52        tracing::Span::current().record("user_recovery_session.email", &session.email);
53
54        if session.consumed_at.is_some() {
55            info!("Recovery session already consumed, not sending email");
56            return Ok(());
57        }
58
59        let mut cursor = Pagination::first(50);
60
61        let lang: DataLocale = session
62            .locale
63            .parse()
64            .context("Invalid locale in database on recovery session")
65            .map_err(JobError::fail)?;
66
67        loop {
68            let page = repo
69                .user_email()
70                .list(UserEmailFilter::new().for_email(&session.email), cursor)
71                .await
72                .map_err(JobError::retry)?;
73
74            for email in page.edges {
75                let ticket = Alphanumeric.sample_string(&mut rng, 32);
76
77                let ticket = repo
78                    .user_recovery()
79                    .add_ticket(&mut rng, &clock, &session, &email, ticket)
80                    .await
81                    .map_err(JobError::retry)?;
82
83                let user_email = repo
84                    .user_email()
85                    .lookup(email.id)
86                    .await
87                    .map_err(JobError::retry)?
88                    .context("User email not found")
89                    .map_err(JobError::fail)?;
90
91                let user = repo
92                    .user()
93                    .lookup(user_email.user_id)
94                    .await
95                    .map_err(JobError::retry)?
96                    .context("User not found")
97                    .map_err(JobError::fail)?;
98
99                let url = url_builder.account_recovery_link(ticket.ticket);
100
101                let address: Address = user_email.email.parse().map_err(JobError::fail)?;
102                let mailbox = Mailbox::new(Some(user.username.clone()), address);
103
104                info!("Sending recovery email to {}", mailbox);
105                let context = EmailRecoveryContext::new(user, session.clone(), url)
106                    .with_language(lang.clone());
107
108                // XXX: we only log if the email fails to send, to avoid stopping the loop
109                if let Err(e) = mailer.send_recovery_email(mailbox, &context).await {
110                    error!(
111                        error = &e as &dyn std::error::Error,
112                        "Failed to send recovery email"
113                    );
114                }
115
116                cursor = cursor.after(email.id);
117            }
118
119            if !page.has_next_page {
120                break;
121            }
122        }
123
124        repo.save().await.map_err(JobError::fail)?;
125
126        Ok(())
127    }
128}