Lines
67.52 %
Functions
21.82 %
Branches
36.15 %
/*
* This file is part of mailpot
*
* Copyright 2020 - Manos Pitsidianakis
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
//! Processing new posts.
use std::borrow::Cow;
use log::{info, trace};
use melib::Envelope;
use rusqlite::OptionalExtension;
use crate::{
errors::*,
mail::{ListContext, ListRequest, PostAction, PostEntry},
models::{changesets::AccountChangeset, Account, DbVal, ListSubscription, MailingList, Post},
queue::{Queue, QueueEntry},
templates::Template,
Connection,
};
impl Connection {
/// Insert a mailing list post into the database.
pub fn insert_post(&self, list_pk: i64, message: &[u8], env: &Envelope) -> Result<i64> {
let from_ = env.from();
let address = if from_.is_empty() {
String::new()
} else {
from_[0].get_email()
let datetime: std::borrow::Cow<'_, str> = if !env.date.is_empty() {
env.date.as_str().into()
melib::utils::datetime::timestamp_to_string(
env.timestamp,
Some(melib::utils::datetime::formats::RFC822_DATE),
true,
)
.into()
let message_id = env.message_id_display();
let mut stmt = self.connection.prepare(
"INSERT OR REPLACE INTO post(list, address, message_id, message, datetime, timestamp) \
VALUES(?, ?, ?, ?, ?, ?) RETURNING pk;",
)?;
let pk = stmt.query_row(
rusqlite::params![
&list_pk,
&address,
&message_id,
&message,
&datetime,
&env.timestamp
],
|row| {
let pk: i64 = row.get("pk")?;
Ok(pk)
},
trace!(
"insert_post list_pk {}, from {:?} message_id {:?} post_pk {}.",
list_pk,
address,
message_id,
pk
);
}
/// Process a new mailing list post.
///
/// In case multiple processes can access the database at any time, use an
/// `EXCLUSIVE` transaction before calling this function.
/// See [`Connection::transaction`].
pub fn post(&self, env: &Envelope, raw: &[u8], _dry_run: bool) -> Result<()> {
let result = self.inner_post(env, raw, _dry_run);
if let Err(err) = result {
return match self.insert_to_queue(QueueEntry::new(
Queue::Error,
None,
Some(Cow::Borrowed(env)),
raw,
Some(err.to_string()),
)?) {
Ok(idx) => {
log::info!(
"Inserted mail from {:?} into error_queue at index {}",
env.from(),
idx
Err(err)
Err(err2) => {
log::error!(
"Could not insert mail from {:?} into error_queue: {err2}",
Err(err.chain_err(|| err2))
result
fn inner_post(&self, env: &Envelope, raw: &[u8], _dry_run: bool) -> Result<()> {
trace!("Received envelope to post: {:#?}", &env);
let tos = env.to().to_vec();
if tos.is_empty() {
return Err("Envelope To: field is empty!".into());
if env.from().is_empty() {
return Err("Envelope From: field is empty!".into());
let mut lists = self.lists()?;
let prev_list_len = lists.len();
for t in &tos {
if let Some((addr, subaddr)) = t.subaddress("+") {
lists.retain(|list| {
if !addr.contains_address(&list.address()) {
return true;
if let Err(err) = ListRequest::try_from((subaddr.as_str(), env))
.and_then(|req| self.request(list, req, env, raw))
{
info!("Processing request returned error: {}", err);
false
});
if lists.len() != prev_list_len {
// Was request, handled above.
return Ok(());
"Is post related to list {}? {}",
&list,
tos.iter().any(|a| a.contains_address(&list.address()))
if lists.is_empty() {
return Err(format!(
"No relevant mailing list found for these addresses: {:?}",
tos
.into());
trace!("Configuration is {:#?}", &self.conf);
for mut list in lists {
trace!("Examining list {}", list.display_name());
let filters = self.list_filters(&list);
let subscriptions = self.list_subscriptions(list.pk)?;
let owners = self.list_owners(list.pk)?;
trace!("List subscriptions {:#?}", &subscriptions);
let mut list_ctx = ListContext {
post_policy: self.list_post_policy(list.pk)?,
subscription_policy: self.list_subscription_policy(list.pk)?,
list_owners: &owners,
subscriptions: &subscriptions,
scheduled_jobs: vec![],
filter_settings: self.get_settings(list.pk)?,
list: &mut list,
let mut post = PostEntry {
message_id: env.message_id().clone(),
from: env.from()[0].clone(),
bytes: raw.to_vec(),
to: env.to().to_vec(),
action: PostAction::Hold,
let result = filters
.into_iter()
.try_fold((&mut post, &mut list_ctx), |(p, c), f| f.feed(p, c));
trace!("result {:#?}", result);
let PostEntry { bytes, action, .. } = post;
trace!("Action is {:#?}", action);
let post_env = melib::Envelope::from_bytes(&bytes, None)?;
match action {
PostAction::Accept => {
let _post_pk = self.insert_post(list_ctx.list.pk, &bytes, &post_env)?;
trace!("post_pk is {:#?}", _post_pk);
for job in list_ctx.scheduled_jobs.iter() {
trace!("job is {:#?}", &job);
if let crate::mail::MailJob::Send { recipients } = job {
trace!("recipients: {:?}", &recipients);
if recipients.is_empty() {
trace!("list has no recipients");
for recipient in recipients {
let mut env = post_env.clone();
env.set_to(melib::smallvec::smallvec![recipient.clone()]);
self.insert_to_queue(QueueEntry::new(
Queue::Out,
Some(list.pk),
Some(Cow::Owned(env)),
&bytes,
)?)?;
PostAction::Reject { reason } => {
log::info!("PostAction::Reject {{ reason: {} }}", reason);
for f in env.from() {
/* send error notice to e-mail sender */
self.send_reply_with_list_template(
TemplateRenderContext {
template: Template::GENERIC_FAILURE,
default_fn: Some(Template::default_generic_failure),
list: &list,
context: minijinja::context! {
list => &list,
subject => format!("Your post to {} was rejected.", list.id),
details => &reason,
queue: Queue::Out,
comment: format!("PostAction::Reject {{ reason: {} }}", reason)
.into(),
std::iter::once(Cow::Borrowed(f)),
/* error handled by notifying submitter */
PostAction::Defer { reason } => {
trace!("PostAction::Defer {{ reason: {} }}", reason);
subject => format!("Your post to {} was deferred.", list.id),
comment: format!("PostAction::Defer {{ reason: {} }}", reason)
Queue::Deferred,
Some(Cow::Borrowed(&post_env)),
Some(format!("PostAction::Defer {{ reason: {} }}", reason)),
PostAction::Hold => {
trace!("PostAction::Hold");
Queue::Hold,
Some("PostAction::Hold".to_string()),
Ok(())
/// Process a new mailing list request.
pub fn request(
&self,
list: &DbVal<MailingList>,
request: ListRequest,
env: &Envelope,
raw: &[u8],
) -> Result<()> {
match request {
ListRequest::Help => {
"help action for addresses {:?} in list {}",
list
let subscription_policy = self.list_subscription_policy(list.pk)?;
let post_policy = self.list_post_policy(list.pk)?;
let subject = format!("Help for {}", list.name);
let details = list
.generate_help_email(post_policy.as_deref(), subscription_policy.as_deref());
template: Template::GENERIC_HELP,
default_fn: Some(Template::default_generic_help),
list,
subject => &subject,
details => &details,
comment: "Help request".into(),
ListRequest::Subscribe => {
"subscribe action for addresses {:?} in list {}",
let approval_needed = subscription_policy
.as_ref()
.map(|p| !p.open)
.unwrap_or(false);
let email_from = f.get_email();
if self
.list_subscription_by_address(list.pk, &email_from)
.is_ok()
subject => format!("You are already subscribed to {}.", list.id),
details => "No action has been taken since you are already subscribed to the list.",
comment: format!("Address {} is already subscribed to list {}", f, list.id).into(),
continue;
let subscription = ListSubscription {
pk: 0,
list: list.pk,
address: f.get_email(),
account: None,
name: f.get_display_name(),
digest: false,
hide_address: false,
receive_duplicates: true,
receive_own_posts: false,
receive_confirmation: true,
enabled: !approval_needed,
verified: true,
if approval_needed {
match self.add_candidate_subscription(list.pk, subscription) {
Ok(v) => {
let list_owners = self.list_owners(list.pk)?;
template: Template::SUBSCRIPTION_REQUEST_NOTICE_OWNER,
default_fn: Some(
Template::default_subscription_request_owner,
),
candidate => &v,
comment: Template::SUBSCRIPTION_REQUEST_NOTICE_OWNER.into(),
list_owners.iter().map(|owner| Cow::Owned(owner.address())),
Err(err) => {
"Could not create candidate subscription for {f:?}: {err}"
comment: format!(
"Could not create candidate subscription for {f:?}: \
{err}"
/* send error details to list owners */
template: Template::ADMIN_NOTICE,
default_fn: Some(Template::default_admin_notice),
details => err.to_string(),
} else if let Err(err) = self.add_subscription(list.pk, subscription) {
log::error!("Could not create subscription for {f:?}: {err}");
comment: format!("Could not create subscription for {f:?}: {err}")
self.send_subscription_confirmation(list, f)?;
ListRequest::Unsubscribe => {
"unsubscribe action for addresses {:?} in list {}",
if let Err(err) = self.remove_subscription(list.pk, &f.get_email()) {
log::error!("Could not unsubscribe {f:?}: {err}");
comment: format!("Could not unsubscribe {f:?}: {err}").into(),
self.send_unsubscription_confirmation(list, f)?;
ListRequest::Other(ref req) if req == "owner" => {
"list-owner mail action for addresses {:?} in list {}",
return Err("list-owner emails are not implemented yet.".into());
//FIXME: mail to list-owner
for _owner in self.list_owners(list.pk)? {
self.insert_to_queue(
draft.finalise()?.as_bytes(),
"list-owner-forward".to_string(),
ListRequest::Other(ref req) if req.trim().eq_ignore_ascii_case("password") => {
"list-request password set action for addresses {:?} in list {list}",
let body = env.body_bytes(raw);
let password = body.text();
// TODO: validate SSH public key with `ssh-keygen`.
if let Ok(sub) = self.list_subscription_by_address(list.pk, &email_from) {
match self.account_by_address(&email_from)? {
Some(_acc) => {
let changeset = AccountChangeset {
address: email_from.clone(),
name: None,
public_key: None,
password: Some(password.clone()),
enabled: None,
self.update_account(changeset)?;
None => {
// Create new account.
self.add_account(Account {
name: sub.name.clone(),
address: sub.address.clone(),
password: password.clone(),
enabled: sub.enabled,
})?;
ListRequest::RetrieveMessages(ref message_ids) => {
"retrieve messages {message_ids:?} action for addresses {:?} in list {list}",
return Err("message retrievals are not implemented yet.".into());
ListRequest::RetrieveArchive(ref from, ref to) => {
"retrieve archive action from {from:?} to {to:?} for addresses {:?} in list \
{list}",
ListRequest::ChangeSetting(ref setting, ref toggle) => {
"change setting {setting}, request with value {toggle:?} for addresses {:?} \
in list {list}",
return Err("setting digest options via e-mail is not implemented yet.".into());
ListRequest::Other(ref req) => {
"unknown request action {req} for addresses {:?} in list {list}",
return Err(format!("Unknown request {req}.").into());
/// Fetch all year and month values for which at least one post exists in
/// `yyyy-mm` format.
pub fn months(&self, list_pk: i64) -> Result<Vec<String>> {
"SELECT DISTINCT strftime('%Y-%m', CAST(timestamp AS INTEGER), 'unixepoch') FROM post \
WHERE list = ?;",
let months_iter = stmt.query_map([list_pk], |row| {
let val: String = row.get(0)?;
Ok(val)
let mut ret = vec![];
for month in months_iter {
let month = month?;
ret.push(month);
Ok(ret)
/// Find a post by its `Message-ID` email header.
pub fn list_post_by_message_id(
list_pk: i64,
message_id: &str,
) -> Result<Option<DbVal<Post>>> {
"SELECT *, strftime('%Y-%m', CAST(timestamp AS INTEGER), 'unixepoch') AS month_year \
FROM post WHERE list = ? AND message_id = ?;",
let ret = stmt
.query_row(rusqlite::params![&list_pk, &message_id], |row| {
let pk = row.get("pk")?;
Ok(DbVal(
Post {
pk,
list: row.get("list")?,
envelope_from: row.get("envelope_from")?,
address: row.get("address")?,
message_id: row.get("message_id")?,
message: row.get("message")?,
timestamp: row.get("timestamp")?,
datetime: row.get("datetime")?,
month_year: row.get("month_year")?,
))
})
.optional()?;
/// Helper function to send a template reply.
pub fn send_reply_with_list_template<'ctx, F: Fn() -> Template>(
render_context: TemplateRenderContext<'ctx, F>,
recipients: impl Iterator<Item = Cow<'ctx, melib::Address>>,
let TemplateRenderContext {
template,
default_fn,
context,
queue,
comment,
} = render_context;
let templ = self
.fetch_template(template, Some(list.pk))?
.map(DbVal::into_inner)
.or_else(|| default_fn.map(|f| f()))
.ok_or_else(|| -> crate::Error {
format!("Template with name {template:?} was not found.").into()
let mut draft = templ.render(context)?;
draft
.headers
.insert(melib::HeaderName::FROM, list.request_subaddr());
for addr in recipients {
let mut draft = draft.clone();
.insert(melib::HeaderName::TO, addr.to_string());
list.insert_headers(
&mut draft,
post_policy.as_deref(),
subscription_policy.as_deref(),
Some(comment.to_string()),
/// Send subscription confirmation.
pub fn send_subscription_confirmation(
address: &melib::Address,
log::trace!(
"Added subscription to list {list:?} for address {address:?}, sending confirmation."
template: Template::SUBSCRIPTION_CONFIRMATION,
default_fn: Some(Template::default_subscription_confirmation),
comment: Template::SUBSCRIPTION_CONFIRMATION.into(),
std::iter::once(Cow::Borrowed(address)),
/// Send unsubscription confirmation.
pub fn send_unsubscription_confirmation(
"Removed subscription to list {list:?} for address {address:?}, sending confirmation."
template: Template::UNSUBSCRIPTION_CONFIRMATION,
default_fn: Some(Template::default_unsubscription_confirmation),
comment: Template::UNSUBSCRIPTION_CONFIRMATION.into(),
/// Helper type for [`Connection::send_reply_with_list_template`].
#[derive(Debug)]
pub struct TemplateRenderContext<'ctx, F: Fn() -> Template> {
/// Template name.
pub template: &'ctx str,
/// If template is not found, call a function that returns one.
pub default_fn: Option<F>,
/// The pertinent list.
pub list: &'ctx DbVal<MailingList>,
/// [`minijinja`]'s template context.
pub context: minijinja::value::Value,
/// Destination queue in the database.
pub queue: Queue,
/// Comment for the queue entry in the database.
pub comment: Cow<'static, str>,