1use anyhow::Context;
8use async_trait::async_trait;
9use mas_email::{Address, Mailbox};
10use mas_i18n::DataLocale;
11use mas_storage::{
12 Pagination, RepositoryAccess,
13 queue::SendAccountRecoveryEmailsJob,
14 user::{UserEmailFilter, UserRecoveryRepository},
15};
16use mas_templates::{EmailRecoveryContext, TemplateContext};
17use rand::distributions::{Alphanumeric, DistString};
18use tracing::{error, info};
19
20use crate::{
21 State,
22 new_queue::{JobContext, JobError, RunnableJob},
23};
24
25#[async_trait]
27impl RunnableJob for SendAccountRecoveryEmailsJob {
28 #[tracing::instrument(
29 name = "job.send_account_recovery_email",
30 fields(
31 user_recovery_session.id = %self.user_recovery_session_id(),
32 user_recovery_session.email,
33 ),
34 skip_all,
35 err,
36 )]
37 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
38 let clock = state.clock();
39 let mailer = state.mailer();
40 let url_builder = state.url_builder();
41 let mut rng = state.rng();
42 let mut repo = state.repository().await.map_err(JobError::retry)?;
43
44 let session = repo
45 .user_recovery()
46 .lookup_session(self.user_recovery_session_id())
47 .await
48 .map_err(JobError::retry)?
49 .context("User recovery session not found")
50 .map_err(JobError::fail)?;
51
52 tracing::Span::current().record("user_recovery_session.email", &session.email);
53
54 if session.consumed_at.is_some() {
55 info!("Recovery session already consumed, not sending email");
56 return Ok(());
57 }
58
59 let mut cursor = Pagination::first(50);
60
61 let lang: DataLocale = session
62 .locale
63 .parse()
64 .context("Invalid locale in database on recovery session")
65 .map_err(JobError::fail)?;
66
67 loop {
68 let page = repo
69 .user_email()
70 .list(UserEmailFilter::new().for_email(&session.email), cursor)
71 .await
72 .map_err(JobError::retry)?;
73
74 for email in page.edges {
75 let ticket = Alphanumeric.sample_string(&mut rng, 32);
76
77 let ticket = repo
78 .user_recovery()
79 .add_ticket(&mut rng, &clock, &session, &email, ticket)
80 .await
81 .map_err(JobError::retry)?;
82
83 let user_email = repo
84 .user_email()
85 .lookup(email.id)
86 .await
87 .map_err(JobError::retry)?
88 .context("User email not found")
89 .map_err(JobError::fail)?;
90
91 let user = repo
92 .user()
93 .lookup(user_email.user_id)
94 .await
95 .map_err(JobError::retry)?
96 .context("User not found")
97 .map_err(JobError::fail)?;
98
99 let url = url_builder.account_recovery_link(ticket.ticket);
100
101 let address: Address = user_email.email.parse().map_err(JobError::fail)?;
102 let mailbox = Mailbox::new(Some(user.username.clone()), address);
103
104 info!("Sending recovery email to {}", mailbox);
105 let context = EmailRecoveryContext::new(user, session.clone(), url)
106 .with_language(lang.clone());
107
108 if let Err(e) = mailer.send_recovery_email(mailbox, &context).await {
110 error!(
111 error = &e as &dyn std::error::Error,
112 "Failed to send recovery email"
113 );
114 }
115
116 cursor = cursor.after(email.id);
117 }
118
119 if !page.has_next_page {
120 break;
121 }
122 }
123
124 repo.save().await.map_err(JobError::fail)?;
125
126 Ok(())
127 }
128}