1
/*
2
 * This file is part of mailpot
3
 *
4
 * Copyright 2020 - Manos Pitsidianakis
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
 * GNU Affero General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18
 */
19

            
20
//! # Queues
21

            
22
use std::borrow::Cow;
23

            
24
use melib::Envelope;
25

            
26
use crate::{errors::*, models::DbVal, Connection, DateTime};
27

            
28
/// In-database queues of mail.
29
32
#[derive(Copy, Clone, Eq, PartialEq, Debug, serde::Serialize, serde::Deserialize)]
30
#[serde(rename_all = "kebab-case")]
31
pub enum Queue {
32
    /// Messages that have been received but not yet processed, await
33
    /// processing in the `maildrop` queue. Messages can be added to the
34
    /// `maildrop` queue even when mailpot is not running.
35
    Maildrop,
36
    /// List administrators may introduce rules for emails to be placed
37
    /// indefinitely in the `hold` queue. Messages placed in the `hold`
38
    /// queue stay there until the administrator intervenes. No periodic
39
    /// delivery attempts are made for messages in the `hold` queue.
40
    Hold,
41
    /// When all the deliverable recipients for a message are delivered, and for
42
    /// some recipients delivery failed for a transient reason (it might
43
    /// succeed later), the message is placed in the `deferred` queue.
44
    Deferred,
45
    /// Invalid received or generated e-mail saved for debug and troubleshooting
46
    /// reasons.
47
    Corrupt,
48
    /// Emails that must be sent as soon as possible.
49
    Out,
50
    /// Error queue
51
    Error,
52
}
53

            
54
impl std::str::FromStr for Queue {
55
    type Err = Error;
56

            
57
    fn from_str(s: &str) -> Result<Self> {
58
        Ok(match s.trim() {
59
            s if s.eq_ignore_ascii_case(stringify!(Maildrop)) => Self::Maildrop,
60
            s if s.eq_ignore_ascii_case(stringify!(Hold)) => Self::Hold,
61
            s if s.eq_ignore_ascii_case(stringify!(Deferred)) => Self::Deferred,
62
            s if s.eq_ignore_ascii_case(stringify!(Corrupt)) => Self::Corrupt,
63
            s if s.eq_ignore_ascii_case(stringify!(Out)) => Self::Out,
64
            s if s.eq_ignore_ascii_case(stringify!(Error)) => Self::Error,
65
            other => return Err(Error::new_external(format!("Invalid Queue name: {other}."))),
66
        })
67
    }
68
}
69

            
70
impl Queue {
71
    /// Returns the name of the queue used in the database schema.
72
82
    pub const fn as_str(&self) -> &'static str {
73
82
        match self {
74
            Self::Maildrop => "maildrop",
75
16
            Self::Hold => "hold",
76
2
            Self::Deferred => "deferred",
77
            Self::Corrupt => "corrupt",
78
48
            Self::Out => "out",
79
16
            Self::Error => "error",
80
        }
81
82
    }
82

            
83
    /// Returns all possible variants as `&'static str`
84
2
    pub const fn possible_values() -> &'static [&'static str] {
85
        const VALUES: &[&str] = &[
86
            Queue::Maildrop.as_str(),
87
            Queue::Hold.as_str(),
88
            Queue::Deferred.as_str(),
89
            Queue::Corrupt.as_str(),
90
            Queue::Out.as_str(),
91
            Queue::Error.as_str(),
92
        ];
93
        VALUES
94
2
    }
95
}
96

            
97
impl std::fmt::Display for Queue {
98
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
99
        write!(fmt, "{}", self.as_str())
100
    }
101
}
102

            
103
/// A queue entry.
104
20
#[derive(Clone, Deserialize, Serialize, PartialEq, Eq)]
105
pub struct QueueEntry {
106
    /// Database primary key.
107
10
    pub pk: i64,
108
    /// Owner queue.
109
10
    pub queue: Queue,
110
    /// Related list foreign key, optional.
111
10
    pub list: Option<i64>,
112
    /// Entry comment, optional.
113
10
    pub comment: Option<String>,
114
    /// Entry recipients in rfc5322 format.
115
10
    pub to_addresses: String,
116
    /// Entry submitter in rfc5322 format.
117
10
    pub from_address: String,
118
    /// Entry subject.
119
10
    pub subject: String,
120
    /// Entry Message-ID in rfc5322 format.
121
10
    pub message_id: String,
122
    /// Message in rfc5322 format as bytes.
123
10
    pub message: Vec<u8>,
124
    /// Unix timestamp of date.
125
10
    pub timestamp: u64,
126
    /// Datetime as string.
127
10
    pub datetime: DateTime,
128
}
129

            
130
impl std::fmt::Display for QueueEntry {
131
22
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
132
22
        write!(fmt, "{:?}", self)
133
22
    }
134
}
135

            
136
impl std::fmt::Debug for QueueEntry {
137
22
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
138
264
        fmt.debug_struct(stringify!(QueueEntry))
139
22
            .field("pk", &self.pk)
140
22
            .field("queue", &self.queue)
141
            .field("list", &self.list)
142
22
            .field("comment", &self.comment)
143
22
            .field("to_addresses", &self.to_addresses)
144
22
            .field("from_address", &self.from_address)
145
22
            .field("subject", &self.subject)
146
22
            .field("message_id", &self.message_id)
147
22
            .field("message length", &self.message.len())
148
            .field(
149
                "message",
150
22
                &format!("{:.15}", String::from_utf8_lossy(&self.message)),
151
            )
152
22
            .field("timestamp", &self.timestamp)
153
22
            .field("datetime", &self.datetime)
154
            .finish()
155
22
    }
156
}
157

            
158
impl QueueEntry {
159
    /// Create new entry.
160
29
    pub fn new(
161
        queue: Queue,
162
        list: Option<i64>,
163
        env: Option<Cow<'_, Envelope>>,
164
        raw: &[u8],
165
        comment: Option<String>,
166
    ) -> Result<Self> {
167
58
        let env = env
168
            .map(Ok)
169
51
            .unwrap_or_else(|| melib::Envelope::from_bytes(raw, None).map(Cow::Owned))?;
170
29
        let now = chrono::offset::Utc::now();
171
29
        Ok(Self {
172
            pk: -1,
173
            list,
174
            queue,
175
29
            comment,
176
29
            to_addresses: env.field_to_to_string(),
177
29
            from_address: env.field_from_to_string(),
178
29
            subject: env.subject().to_string(),
179
29
            message_id: env.message_id().to_string(),
180
29
            message: raw.to_vec(),
181
29
            timestamp: now.timestamp() as u64,
182
29
            datetime: now,
183
        })
184
29
    }
185
}
186

            
187
impl Connection {
188
    /// Insert a received email into a queue.
189
34
    pub fn insert_to_queue(&self, mut entry: QueueEntry) -> Result<DbVal<QueueEntry>> {
190
        log::trace!("Inserting to queue: {entry}");
191
34
        let mut stmt = self.connection.prepare(
192
            "INSERT INTO queue(which, list, comment, to_addresses, from_address, subject, \
193
             message_id, message, timestamp, datetime) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \
194
             RETURNING pk;",
195
        )?;
196
34
        let pk = stmt.query_row(
197
34
            rusqlite::params![
198
34
                entry.queue.as_str(),
199
34
                &entry.list,
200
34
                &entry.comment,
201
34
                &entry.to_addresses,
202
34
                &entry.from_address,
203
34
                &entry.subject,
204
34
                &entry.message_id,
205
34
                &entry.message,
206
34
                &entry.timestamp,
207
34
                &entry.datetime,
208
            ],
209
34
            |row| {
210
34
                let pk: i64 = row.get("pk")?;
211
34
                Ok(pk)
212
34
            },
213
34
        )?;
214
34
        entry.pk = pk;
215
34
        Ok(DbVal(entry, pk))
216
34
    }
217

            
218
    /// Fetch all queue entries.
219
38
    pub fn queue(&self, queue: Queue) -> Result<Vec<DbVal<QueueEntry>>> {
220
38
        let mut stmt = self
221
            .connection
222
            .prepare("SELECT * FROM queue WHERE which = ?;")?;
223
84
        let iter = stmt.query_map([&queue.as_str()], |row| {
224
46
            let pk = row.get::<_, i64>("pk")?;
225
46
            Ok(DbVal(
226
46
                QueueEntry {
227
                    pk,
228
46
                    queue,
229
46
                    list: row.get::<_, Option<i64>>("list")?,
230
46
                    comment: row.get::<_, Option<String>>("comment")?,
231
46
                    to_addresses: row.get::<_, String>("to_addresses")?,
232
46
                    from_address: row.get::<_, String>("from_address")?,
233
46
                    subject: row.get::<_, String>("subject")?,
234
46
                    message_id: row.get::<_, String>("message_id")?,
235
46
                    message: row.get::<_, Vec<u8>>("message")?,
236
46
                    timestamp: row.get::<_, u64>("timestamp")?,
237
46
                    datetime: row.get::<_, DateTime>("datetime")?,
238
46
                },
239
                pk,
240
            ))
241
46
        })?;
242

            
243
38
        let mut ret = vec![];
244
84
        for item in iter {
245
46
            let item = item?;
246
46
            ret.push(item);
247
        }
248
38
        Ok(ret)
249
38
    }
250

            
251
    /// Delete queue entries returning the deleted values.
252
10
    pub fn delete_from_queue(&self, queue: Queue, index: Vec<i64>) -> Result<Vec<QueueEntry>> {
253
10
        let tx = self.savepoint(Some(stringify!(delete_from_queue)))?;
254

            
255
26
        let cl = |row: &rusqlite::Row<'_>| {
256
16
            Ok(QueueEntry {
257
                pk: -1,
258
16
                queue,
259
32
                list: row.get::<_, Option<i64>>("list")?,
260
16
                comment: row.get::<_, Option<String>>("comment")?,
261
16
                to_addresses: row.get::<_, String>("to_addresses")?,
262
16
                from_address: row.get::<_, String>("from_address")?,
263
16
                subject: row.get::<_, String>("subject")?,
264
16
                message_id: row.get::<_, String>("message_id")?,
265
16
                message: row.get::<_, Vec<u8>>("message")?,
266
16
                timestamp: row.get::<_, u64>("timestamp")?,
267
16
                datetime: row.get::<_, DateTime>("datetime")?,
268
16
            })
269
16
        };
270
20
        let mut stmt = if index.is_empty() {
271
9
            tx.connection
272
                .prepare("DELETE FROM queue WHERE which = ? RETURNING *;")?
273
        } else {
274
1
            tx.connection
275
                .prepare("DELETE FROM queue WHERE which = ? AND pk IN rarray(?) RETURNING *;")?
276
        };
277
19
        let iter = if index.is_empty() {
278
9
            stmt.query_map([&queue.as_str()], cl)?
279
        } else {
280
            // Note: A `Rc<Vec<Value>>` must be used as the parameter.
281
1
            let index = std::rc::Rc::new(
282
1
                index
283
                    .into_iter()
284
                    .map(rusqlite::types::Value::from)
285
                    .collect::<Vec<rusqlite::types::Value>>(),
286
            );
287
1
            stmt.query_map(rusqlite::params![queue.as_str(), index], cl)?
288
1
        };
289

            
290
10
        let mut ret = vec![];
291
26
        for item in iter {
292
16
            let item = item?;
293
16
            ret.push(item);
294
        }
295
10
        drop(stmt);
296
20
        tx.commit()?;
297
10
        Ok(ret)
298
10
    }
299
}
300

            
301
#[cfg(test)]
302
mod tests {
303
    use super::*;
304
    use crate::*;
305

            
306
    #[test]
307
2
    fn test_queue_delete_array() {
308
        use tempfile::TempDir;
309

            
310
1
        let tmp_dir = TempDir::new().unwrap();
311
1
        let db_path = tmp_dir.path().join("mpot.db");
312
1
        let config = Configuration {
313
1
            send_mail: SendMail::ShellCommand("/usr/bin/false".to_string()),
314
            db_path,
315
1
            data_path: tmp_dir.path().to_path_buf(),
316
1
            administrators: vec![],
317
        };
318

            
319
1
        let db = Connection::open_or_create_db(config).unwrap().trusted();
320
6
        for i in 0..5 {
321
5
            db.insert_to_queue(
322
5
                QueueEntry::new(
323
5
                    Queue::Hold,
324
5
                    None,
325
5
                    None,
326
10
                    format!("Subject: testing\r\nMessage-Id: {i}@localhost\r\n\r\nHello\r\n")
327
                        .as_bytes(),
328
5
                    None,
329
                )
330
                .unwrap(),
331
            )
332
5
            .unwrap();
333
        }
334
1
        let entries = db.queue(Queue::Hold).unwrap();
335
1
        assert_eq!(entries.len(), 5);
336
1
        let out_entries = db.delete_from_queue(Queue::Out, vec![]).unwrap();
337
1
        assert_eq!(db.queue(Queue::Hold).unwrap().len(), 5);
338
1
        assert!(out_entries.is_empty());
339
1
        let deleted_entries = db.delete_from_queue(Queue::Hold, vec![]).unwrap();
340
1
        assert_eq!(deleted_entries.len(), 5);
341
1
        assert_eq!(
342
1
            &entries
343
                .iter()
344
                .cloned()
345
                .map(DbVal::into_inner)
346
5
                .map(|mut e| {
347
5
                    e.pk = -1;
348
5
                    e
349
10
                })
350
                .collect::<Vec<_>>(),
351
1
            &deleted_entries
352
        );
353

            
354
6
        for e in deleted_entries {
355
10
            db.insert_to_queue(e).unwrap();
356
        }
357

            
358
1
        let index = db
359
1
            .queue(Queue::Hold)
360
            .unwrap()
361
            .into_iter()
362
            .skip(2)
363
2
            .map(|e| e.pk())
364
            .take(2)
365
            .collect::<Vec<i64>>();
366
1
        let deleted_entries = db.delete_from_queue(Queue::Hold, index).unwrap();
367
1
        assert_eq!(deleted_entries.len(), 2);
368
1
        assert_eq!(db.queue(Queue::Hold).unwrap().len(), 3);
369
2
    }
370
}