1
/*
2
 * This file is part of mailpot
3
 *
4
 * Copyright 2020 - Manos Pitsidianakis
5
 *
6
 * This program is free software: you can redistribute it and/or modify
7
 * it under the terms of the GNU Affero General Public License as
8
 * published by the Free Software Foundation, either version 3 of the
9
 * License, or (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
 * GNU Affero General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Affero General Public License
17
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18
 */
19

            
20
//! Mailpot database and methods.
21

            
22
use std::{
23
    io::Write,
24
    process::{Command, Stdio},
25
};
26

            
27
use jsonschema::JSONSchema;
28
use log::{info, trace};
29
use rusqlite::{functions::FunctionFlags, Connection as DbConnection, OptionalExtension};
30

            
31
use crate::{
32
    config::Configuration,
33
    errors::{ErrorKind::*, *},
34
    models::{changesets::MailingListChangeset, DbVal, ListOwner, MailingList, Post},
35
    StripCarets,
36
};
37

            
38
/// A connection to a `mailpot` database.
39
pub struct Connection {
40
    /// The `rusqlite` connection handle.
41
    pub connection: DbConnection,
42
    pub(crate) conf: Configuration,
43
}
44

            
45
impl std::fmt::Debug for Connection {
46
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
47
        fmt.debug_struct("Connection")
48
            .field("conf", &self.conf)
49
            .finish()
50
    }
51
}
52

            
53
impl Drop for Connection {
54
72
    fn drop(&mut self) {
55
144
        self.connection
56
            .authorizer::<fn(rusqlite::hooks::AuthContext<'_>) -> rusqlite::hooks::Authorization>(
57
72
                None,
58
            );
59
        // make sure pragma optimize does not take too long
60
72
        _ = self.connection.pragma_update(None, "analysis_limit", "400");
61
        // gather statistics to improve query optimization
62
144
        _ = self
63
            .connection
64
144
            .pragma(None, "optimize", 0xfffe_i64, |_| Ok(()));
65
72
    }
66
}
67

            
68
16
fn log_callback(error_code: std::ffi::c_int, message: &str) {
69
16
    match error_code {
70
8
        rusqlite::ffi::SQLITE_NOTICE => log::trace!("{}", message),
71
        rusqlite::ffi::SQLITE_OK
72
        | rusqlite::ffi::SQLITE_DONE
73
        | rusqlite::ffi::SQLITE_NOTICE_RECOVER_WAL
74
        | rusqlite::ffi::SQLITE_NOTICE_RECOVER_ROLLBACK => log::info!("{}", message),
75
2
        rusqlite::ffi::SQLITE_WARNING | rusqlite::ffi::SQLITE_WARNING_AUTOINDEX => {
76
2
            log::warn!("{}", message)
77
        }
78
        _ => log::error!("{error_code} {}", message),
79
    }
80
16
}
81

            
82
2457
fn user_authorizer_callback(
83
    auth_context: rusqlite::hooks::AuthContext<'_>,
84
) -> rusqlite::hooks::Authorization {
85
    use rusqlite::hooks::{AuthAction, Authorization};
86

            
87
    // [ref:sync_auth_doc] sync with `untrusted()` rustdoc when changing this.
88
2457
    match auth_context.action {
89
        AuthAction::Delete {
90
2
            table_name: "queue" | "candidate_subscription" | "subscription",
91
        }
92
        | AuthAction::Insert {
93
19
            table_name: "post" | "queue" | "candidate_subscription" | "subscription" | "account",
94
        }
95
        | AuthAction::Update {
96
37
            table_name: "candidate_subscription" | "template",
97
12
            column_name: "accepted" | "last_modified" | "verified" | "address",
98
        }
99
        | AuthAction::Update {
100
25
            table_name: "account",
101
2
            column_name: "last_modified" | "name" | "public_key" | "password",
102
        }
103
        | AuthAction::Update {
104
23
            table_name: "subscription",
105
            column_name:
106
23
                "last_modified"
107
9
                | "account"
108
4
                | "digest"
109
4
                | "verified"
110
                | "hide_address"
111
                | "receive_duplicates"
112
                | "receive_own_posts"
113
                | "receive_confirmation",
114
        }
115
186
        | AuthAction::Select
116
2
        | AuthAction::Savepoint { .. }
117
        | AuthAction::Transaction { .. }
118
2171
        | AuthAction::Read { .. }
119
        | AuthAction::Function {
120
40
            function_name: "count" | "strftime" | "unixepoch" | "datetime",
121
2453
        } => Authorization::Allow,
122
4
        _ => Authorization::Deny,
123
    }
124
2457
}
125

            
126
impl Connection {
127
    /// The database schema.
128
    ///
129
    /// ```sql
130
    #[doc = include_str!("./schema.sql")]
131
    /// ```
132
    pub const SCHEMA: &'static str = include_str!("./schema.sql");
133

            
134
    /// Database migrations.
135
    pub const MIGRATIONS: &'static [(u32, &'static str, &'static str)] =
136
        include!("./migrations.rs.inc");
137

            
138
    /// Creates a new database connection.
139
    ///
140
    /// `Connection` supports a limited subset of operations by default (see
141
    /// [`Connection::untrusted`]).
142
    /// Use [`Connection::trusted`] to remove these limits.
143
    ///
144
    /// # Example
145
    ///
146
    /// ```rust,no_run
147
    /// use mailpot::{Connection, Configuration};
148
    /// use melib::smtp::{SmtpServerConf, SmtpAuth, SmtpSecurity};
149
    /// #
150
    /// # fn main() -> mailpot::Result<()> {
151
    /// # use tempfile::TempDir;
152
    /// #
153
    /// # let tmp_dir = TempDir::new()?;
154
    /// # let db_path = tmp_dir.path().join("mpot.db");
155
    /// # let data_path = tmp_dir.path().to_path_buf();
156
    /// let config = Configuration {
157
    ///     send_mail: mailpot::SendMail::Smtp(
158
    ///         SmtpServerConf {
159
    ///             hostname: "127.0.0.1".into(),
160
    ///             port: 25,
161
    ///             envelope_from: "foo-chat@example.com".into(),
162
    ///             auth: SmtpAuth::None,
163
    ///             security: SmtpSecurity::None,
164
    ///             extensions: Default::default(),
165
    ///         }
166
    ///     ),
167
    ///     db_path,
168
    ///     data_path,
169
    ///     administrators: vec![],
170
    /// };
171
    /// # assert_eq!(&Connection::open_db(config.clone()).unwrap_err().to_string(), "Database doesn't exist");
172
    ///
173
    /// let db = Connection::open_or_create_db(config)?;
174
    /// # _ = db;
175
    /// # Ok(())
176
    /// # }
177
    /// ```
178
73
    pub fn open_db(conf: Configuration) -> Result<Self> {
179
        use std::sync::Once;
180

            
181
        use rusqlite::config::DbConfig;
182

            
183
        static INIT_SQLITE_LOGGING: Once = Once::new();
184

            
185
73
        if !conf.db_path.exists() {
186
1
            return Err("Database doesn't exist".into());
187
        }
188
121
        INIT_SQLITE_LOGGING.call_once(|| {
189
49
            _ = unsafe { rusqlite::trace::config_log(Some(log_callback)) };
190
49
        });
191
72
        let conn = DbConnection::open(conf.db_path.to_str().unwrap()).with_context(|| {
192
            format!("sqlite3 library could not open {}.", conf.db_path.display())
193
        })?;
194
72
        rusqlite::vtab::array::load_module(&conn)?;
195
72
        conn.pragma_update(None, "journal_mode", "WAL")?;
196
72
        conn.pragma_update(None, "foreign_keys", "on")?;
197
        // synchronise less often to the filesystem
198
72
        conn.pragma_update(None, "synchronous", "normal")?;
199
72
        conn.set_db_config(DbConfig::SQLITE_DBCONFIG_ENABLE_FKEY, true)?;
200
72
        conn.set_db_config(DbConfig::SQLITE_DBCONFIG_ENABLE_TRIGGER, true)?;
201
72
        conn.set_db_config(DbConfig::SQLITE_DBCONFIG_DEFENSIVE, true)?;
202
72
        conn.set_db_config(DbConfig::SQLITE_DBCONFIG_TRUSTED_SCHEMA, true)?;
203
72
        conn.busy_timeout(core::time::Duration::from_millis(500))?;
204
72
        conn.busy_handler(Some(|times: i32| -> bool { times < 5 }))?;
205
72
        conn.create_scalar_function(
206
            "validate_json_schema",
207
            2,
208
72
            FunctionFlags::SQLITE_INNOCUOUS
209
                | FunctionFlags::SQLITE_UTF8
210
                | FunctionFlags::SQLITE_DETERMINISTIC,
211
8
            |ctx| {
212
8
                if log::log_enabled!(log::Level::Trace) {
213
8
                    rusqlite::trace::log(
214
                        rusqlite::ffi::SQLITE_NOTICE,
215
                        "validate_json_schema RUNNING",
216
                    );
217
                }
218
8
                let map_err = rusqlite::Error::UserFunctionError;
219
8
                let schema = ctx.get::<String>(0)?;
220
8
                let value = ctx.get::<String>(1)?;
221
8
                let schema_val: serde_json::Value = serde_json::from_str(&schema)
222
                    .map_err(Into::into)
223
                    .map_err(map_err)?;
224
8
                let value: serde_json::Value = serde_json::from_str(&value)
225
                    .map_err(Into::into)
226
                    .map_err(map_err)?;
227
8
                let compiled = JSONSchema::compile(&schema_val)
228
                    .map_err(|err| err.to_string())
229
                    .map_err(Into::into)
230
                    .map_err(map_err)?;
231
10
                let x = if let Err(errors) = compiled.validate(&value) {
232
4
                    for err in errors {
233
4
                        rusqlite::trace::log(rusqlite::ffi::SQLITE_WARNING, &err.to_string());
234
2
                        drop(err);
235
                    }
236
2
                    Ok(false)
237
                } else {
238
6
                    Ok(true)
239
8
                };
240
8
                x
241
8
            },
242
        )?;
243

            
244
72
        let ret = Self {
245
72
            conf,
246
72
            connection: conn,
247
        };
248
144
        if let Some(&(latest, _, _)) = Self::MIGRATIONS.last() {
249
72
            let version = ret.schema_version()?;
250
72
            trace!(
251
                "SQLITE user_version PRAGMA returned {version}. Most recent migration is {latest}."
252
            );
253
72
            if version < latest {
254
                info!("Updating database schema from version {version} to {latest}...");
255
            }
256
72
            ret.migrate(version, latest)?;
257
        }
258

            
259
72
        ret.connection.authorizer(Some(user_authorizer_callback));
260
72
        Ok(ret)
261
73
    }
262

            
263
    /// The version of the current schema.
264
73
    pub fn schema_version(&self) -> Result<u32> {
265
73
        Ok(self
266
            .connection
267
            .prepare("SELECT user_version FROM pragma_user_version;")?
268
73
            .query_row([], |row| {
269
73
                let v: u32 = row.get(0)?;
270
73
                Ok(v)
271
73
            })?)
272
146
    }
273

            
274
    /// Migrate from version `from` to `to`.
275
    ///
276
    /// See [Self::MIGRATIONS].
277
74
    pub fn migrate(&self, mut from: u32, to: u32) -> Result<()> {
278
74
        if from == to {
279
68
            return Ok(());
280
        }
281

            
282
6
        let undo = from > to;
283
6
        let tx = self.savepoint(Some(stringify!(migrate)))?;
284

            
285
42
        while from != to {
286
36
            log::trace!(
287
                "exec migration from {from} to {to}, type: {}do",
288
                if undo { "un " } else { "re" }
289
            );
290
72
            if undo {
291
                trace!("{}", Self::MIGRATIONS[from as usize - 1].2);
292
6
                tx.connection
293
6
                    .execute_batch(Self::MIGRATIONS[from as usize - 1].2)?;
294
6
                from -= 1;
295
            } else {
296
30
                trace!("{}", Self::MIGRATIONS[from as usize].1);
297
30
                tx.connection
298
30
                    .execute_batch(Self::MIGRATIONS[from as usize].1)?;
299
30
                from += 1;
300
            }
301
        }
302
6
        tx.connection
303
6
            .pragma_update(None, "user_version", Self::MIGRATIONS[to as usize - 1].0)?;
304

            
305
80
        tx.commit()?;
306

            
307
6
        Ok(())
308
74
    }
309

            
310
    /// Removes operational limits from this connection. (see
311
    /// [`Connection::untrusted`])
312
    #[must_use]
313
57
    pub fn trusted(self) -> Self {
314
114
        self.connection
315
            .authorizer::<fn(rusqlite::hooks::AuthContext<'_>) -> rusqlite::hooks::Authorization>(
316
57
                None,
317
            );
318
57
        self
319
57
    }
320

            
321
    // [tag:sync_auth_doc]
322
    /// Sets operational limits for this connection.
323
    ///
324
    /// - Allow `INSERT`, `DELETE` only for "queue", "candidate_subscription",
325
    ///   "subscription".
326
    /// - Allow `UPDATE` only for "subscription" user facing settings.
327
    /// - Allow `INSERT` only for "post".
328
    /// - Allow read access to all tables.
329
    /// - Allow `SELECT`, `TRANSACTION`, `SAVEPOINT`, and the `strftime`
330
    ///   function.
331
    /// - Deny everything else.
332
7
    pub fn untrusted(self) -> Self {
333
7
        self.connection.authorizer(Some(user_authorizer_callback));
334
7
        self
335
7
    }
336

            
337
    /// Create a database if it doesn't exist and then open it.
338
52
    pub fn open_or_create_db(conf: Configuration) -> Result<Self> {
339
52
        if !conf.db_path.exists() {
340
33
            let db_path = &conf.db_path;
341
            use std::os::unix::fs::PermissionsExt;
342

            
343
33
            info!("Creating database in {}", db_path.display());
344
33
            std::fs::File::create(db_path).context("Could not create db path")?;
345

            
346
            let mut child =
347
165
                Command::new(std::env::var("SQLITE_BIN").unwrap_or_else(|_| "sqlite3".into()))
348
                    .arg(db_path)
349
33
                    .stdin(Stdio::piped())
350
33
                    .stdout(Stdio::piped())
351
33
                    .stderr(Stdio::piped())
352
                    .spawn()
353
33
                    .with_context(|| {
354
                        format!(
355
                            "Could not launch {} {}.",
356
                            std::env::var("SQLITE_BIN").unwrap_or_else(|_| "sqlite3".into()),
357
                            db_path.display()
358
                        )
359
33
                    })?;
360
33
            let mut stdin = child.stdin.take().unwrap();
361
66
            std::thread::spawn(move || {
362
33
                stdin
363
33
                    .write_all(Self::SCHEMA.as_bytes())
364
                    .expect("failed to write to stdin");
365
33
                if !Self::MIGRATIONS.is_empty() {
366
33
                    stdin
367
                        .write_all(b"\nPRAGMA user_version = ")
368
                        .expect("failed to write to stdin");
369
33
                    stdin
370
                        .write_all(
371
33
                            Self::MIGRATIONS[Self::MIGRATIONS.len() - 1]
372
                                .0
373
                                .to_string()
374
                                .as_bytes(),
375
                        )
376
33
                        .expect("failed to write to stdin");
377
33
                    stdin.write_all(b";").expect("failed to write to stdin");
378
                }
379
33
                stdin.flush().expect("could not flush stdin");
380
66
            });
381
33
            let output = child.wait_with_output()?;
382
33
            if !output.status.success() {
383
                return Err(format!(
384
                    "Could not initialize sqlite3 database at {}: sqlite3 returned exit code {} \
385
                     and stderr {} {}",
386
                    db_path.display(),
387
                    output.status.code().unwrap_or_default(),
388
                    String::from_utf8_lossy(&output.stderr),
389
                    String::from_utf8_lossy(&output.stdout)
390
                )
391
                .into());
392
            }
393

            
394
66
            let file = std::fs::File::open(db_path)
395
33
                .with_context(|| format!("Could not open database {}.", db_path.display()))?;
396
66
            let metadata = file
397
                .metadata()
398
33
                .with_context(|| format!("Could not fstat database {}.", db_path.display()))?;
399
33
            let mut permissions = metadata.permissions();
400

            
401
33
            permissions.set_mode(0o600); // Read/write for owner only.
402
66
            file.set_permissions(permissions)
403
33
                .with_context(|| format!("Could not chmod 600 database {}.", db_path.display()))?;
404
33
        }
405
52
        Self::open_db(conf)
406
52
    }
407

            
408
    /// Returns a connection's configuration.
409
17
    pub fn conf(&self) -> &Configuration {
410
        &self.conf
411
17
    }
412

            
413
    /// Loads archive databases from [`Configuration::data_path`], if any.
414
    pub fn load_archives(&self) -> Result<()> {
415
        let tx = self.savepoint(Some(stringify!(load_archives)))?;
416
        {
417
            let mut stmt = tx.connection.prepare("ATTACH ? AS ?;")?;
418
            for archive in std::fs::read_dir(&self.conf.data_path)? {
419
                let archive = archive?;
420
                let path = archive.path();
421
                let name = path.file_name().unwrap_or_default();
422
                if path == self.conf.db_path {
423
                    continue;
424
                }
425
                stmt.execute(rusqlite::params![
426
                    path.to_str().unwrap(),
427
                    name.to_str().unwrap()
428
                ])?;
429
            }
430
        }
431
        tx.commit()?;
432

            
433
        Ok(())
434
    }
435

            
436
    /// Returns a vector of existing mailing lists.
437
67
    pub fn lists(&self) -> Result<Vec<DbVal<MailingList>>> {
438
67
        let mut stmt = self.connection.prepare("SELECT * FROM list;")?;
439
130
        let list_iter = stmt.query_map([], |row| {
440
63
            let pk = row.get("pk")?;
441
63
            let topics: serde_json::Value = row.get("topics")?;
442
63
            let topics = MailingList::topics_from_json_value(topics)?;
443
63
            Ok(DbVal(
444
63
                MailingList {
445
                    pk,
446
63
                    name: row.get("name")?,
447
63
                    id: row.get("id")?,
448
63
                    address: row.get("address")?,
449
63
                    description: row.get("description")?,
450
63
                    topics,
451
63
                    archive_url: row.get("archive_url")?,
452
                },
453
                pk,
454
            ))
455
63
        })?;
456

            
457
67
        let mut ret = vec![];
458
130
        for list in list_iter {
459
63
            let list = list?;
460
63
            ret.push(list);
461
        }
462
67
        Ok(ret)
463
67
    }
464

            
465
    /// Fetch a mailing list by primary key.
466
3
    pub fn list(&self, pk: i64) -> Result<Option<DbVal<MailingList>>> {
467
3
        let mut stmt = self
468
            .connection
469
            .prepare("SELECT * FROM list WHERE pk = ?;")?;
470
3
        let ret = stmt
471
6
            .query_row([&pk], |row| {
472
3
                let pk = row.get("pk")?;
473
3
                let topics: serde_json::Value = row.get("topics")?;
474
3
                let topics = MailingList::topics_from_json_value(topics)?;
475
3
                Ok(DbVal(
476
3
                    MailingList {
477
                        pk,
478
3
                        name: row.get("name")?,
479
3
                        id: row.get("id")?,
480
3
                        address: row.get("address")?,
481
3
                        description: row.get("description")?,
482
3
                        topics,
483
3
                        archive_url: row.get("archive_url")?,
484
                    },
485
                    pk,
486
                ))
487
3
            })
488
3
            .optional()?;
489
3
        Ok(ret)
490
3
    }
491

            
492
    /// Fetch a mailing list by id.
493
2
    pub fn list_by_id<S: AsRef<str>>(&self, id: S) -> Result<Option<DbVal<MailingList>>> {
494
2
        let id = id.as_ref();
495
2
        let mut stmt = self
496
            .connection
497
            .prepare("SELECT * FROM list WHERE id = ?;")?;
498
2
        let ret = stmt
499
4
            .query_row([&id], |row| {
500
2
                let pk = row.get("pk")?;
501
2
                let topics: serde_json::Value = row.get("topics")?;
502
2
                let topics = MailingList::topics_from_json_value(topics)?;
503
2
                Ok(DbVal(
504
2
                    MailingList {
505
                        pk,
506
2
                        name: row.get("name")?,
507
2
                        id: row.get("id")?,
508
2
                        address: row.get("address")?,
509
2
                        description: row.get("description")?,
510
2
                        topics,
511
2
                        archive_url: row.get("archive_url")?,
512
                    },
513
                    pk,
514
                ))
515
2
            })
516
2
            .optional()?;
517

            
518
2
        Ok(ret)
519
2
    }
520

            
521
    /// Create a new list.
522
40
    pub fn create_list(&self, new_val: MailingList) -> Result<DbVal<MailingList>> {
523
40
        let mut stmt = self.connection.prepare(
524
            "INSERT INTO list(name, id, address, description, archive_url, topics) VALUES(?, ?, \
525
             ?, ?, ?, ?) RETURNING *;",
526
1
        )?;
527
39
        let ret = stmt.query_row(
528
39
            rusqlite::params![
529
39
                &new_val.name,
530
39
                &new_val.id,
531
39
                &new_val.address,
532
39
                new_val.description.as_ref(),
533
39
                new_val.archive_url.as_ref(),
534
39
                serde_json::json!(new_val.topics.as_slice()),
535
            ],
536
39
            |row| {
537
39
                let pk = row.get("pk")?;
538
39
                let topics: serde_json::Value = row.get("topics")?;
539
39
                let topics = MailingList::topics_from_json_value(topics)?;
540
39
                Ok(DbVal(
541
39
                    MailingList {
542
                        pk,
543
39
                        name: row.get("name")?,
544
39
                        id: row.get("id")?,
545
39
                        address: row.get("address")?,
546
39
                        description: row.get("description")?,
547
39
                        topics,
548
39
                        archive_url: row.get("archive_url")?,
549
                    },
550
                    pk,
551
                ))
552
39
            },
553
39
        )?;
554

            
555
39
        trace!("create_list {:?}.", &ret);
556
39
        Ok(ret)
557
40
    }
558

            
559
    /// Fetch all posts of a mailing list.
560
12
    pub fn list_posts(
561
        &self,
562
        list_pk: i64,
563
        _date_range: Option<(String, String)>,
564
    ) -> Result<Vec<DbVal<Post>>> {
565
12
        let mut stmt = self.connection.prepare(
566
            "SELECT *, strftime('%Y-%m', CAST(timestamp AS INTEGER), 'unixepoch') AS month_year \
567
             FROM post WHERE list = ? ORDER BY timestamp ASC;",
568
        )?;
569
21
        let iter = stmt.query_map(rusqlite::params![&list_pk], |row| {
570
9
            let pk = row.get("pk")?;
571
9
            Ok(DbVal(
572
9
                Post {
573
                    pk,
574
9
                    list: row.get("list")?,
575
9
                    envelope_from: row.get("envelope_from")?,
576
9
                    address: row.get("address")?,
577
9
                    message_id: row.get("message_id")?,
578
9
                    message: row.get("message")?,
579
9
                    timestamp: row.get("timestamp")?,
580
9
                    datetime: row.get("datetime")?,
581
9
                    month_year: row.get("month_year")?,
582
                },
583
                pk,
584
            ))
585
9
        })?;
586
12
        let mut ret = vec![];
587
21
        for post in iter {
588
9
            let post = post?;
589
9
            ret.push(post);
590
        }
591

            
592
12
        trace!("list_posts {:?}.", &ret);
593
12
        Ok(ret)
594
12
    }
595

            
596
    /// Fetch the contents of a single thread in the form of `(depth, post)`
597
    /// where `depth` is the reply distance between a message and the thread
598
    /// root message.
599
1
    pub fn list_thread(&self, list_pk: i64, root: &str) -> Result<Vec<(i64, DbVal<Post>)>> {
600
1
        let mut stmt = self
601
            .connection
602
            .prepare(
603
                "WITH RECURSIVE cte_replies AS MATERIALIZED
604
            (
605
                SELECT
606
                pk,
607
                message_id,
608
                REPLACE(
609
                    TRIM(
610
                        SUBSTR(
611
                            CAST(message AS TEXT),
612
                            INSTR(
613
                                CAST(message AS TEXT),
614
                                'In-Reply-To: '
615
                            )
616
                            +
617
                            LENGTH('in-reply-to: '),
618
                            INSTR(
619
                                SUBSTR(
620
                                    CAST(message AS TEXT),
621
                                    INSTR(
622
                                        CAST(message AS TEXT),
623
                                        'In-Reply-To: ')
624
                                        +
625
                                        LENGTH('in-reply-to: ')
626
                                ),
627
                                '>'
628
                            )
629
                        )
630
                    ),
631
                    ' ',
632
                    ''
633
                ) AS in_reply_to,
634
                INSTR(
635
                    CAST(message AS TEXT),
636
                    'In-Reply-To: '
637
                ) AS offset
638
                FROM post
639
                WHERE
640
                    offset > 0
641
                UNION
642
                SELECT
643
                pk,
644
                message_id,
645
                NULL AS in_reply_to,
646
                INSTR(
647
                    CAST(message AS TEXT),
648
                    'In-Reply-To: '
649
                ) AS offset
650
                FROM post
651
                WHERE
652
                offset = 0
653
            ),
654
            cte_thread(parent, root, depth) AS (
655
              SELECT DISTINCT
656
                message_id AS parent,
657
                message_id AS root,
658
                0 AS depth
659
              FROM cte_replies
660
              WHERE
661
                in_reply_to IS NULL
662
              UNION ALL
663
              SELECT
664
                t.message_id AS parent,
665
                cte_thread.root AS root,
666
                (cte_thread.depth + 1) AS depth
667
              FROM cte_replies
668
              AS t
669
              JOIN
670
              cte_thread
671
              ON cte_thread.parent = t.in_reply_to
672
              WHERE t.in_reply_to IS NOT NULL
673
)
674
SELECT * FROM cte_thread WHERE root = ? ORDER BY root, depth;",
675
            )
676
            .unwrap();
677
2
        let iter = stmt.query_map(rusqlite::params![root], |row| {
678
1
            let parent: String = row.get("parent")?;
679
1
            let root: String = row.get("root")?;
680
1
            let depth: i64 = row.get("depth")?;
681
1
            Ok((parent, root, depth))
682
1
        })?;
683
1
        let mut ret = vec![];
684
2
        for post in iter {
685
1
            ret.push(post?);
686
        }
687
1
        let posts = self.list_posts(list_pk, None)?;
688
2
        let ret = ret
689
            .into_iter()
690
3
            .filter_map(|(m, _, depth)| {
691
4
                posts
692
                    .iter()
693
2
                    .find(|p| m.as_str().strip_carets() == p.message_id.as_str().strip_carets())
694
2
                    .map(|p| (depth, p.clone()))
695
1
            })
696
            .skip(1)
697
            .collect();
698
1
        Ok(ret)
699
1
    }
700

            
701
    /// Export a list, message, or thread in mbox format
702
1
    pub fn export_mbox(
703
        &self,
704
        pk: i64,
705
        message_id: Option<&str>,
706
        as_thread: bool,
707
    ) -> Result<Vec<u8>> {
708
        let posts: Result<Vec<DbVal<Post>>> = {
709
2
            if let Some(message_id) = message_id {
710
                if as_thread {
711
                    // export a thread
712
                    let thread = self.list_thread(pk, message_id)?;
713
                    Ok(thread.iter().map(|item| item.1.clone()).collect())
714
                } else {
715
                    // export a single message
716
                    let message =
717
                        self.list_post_by_message_id(pk, message_id)?
718
                            .ok_or_else(|| {
719
                                Error::from(format!("no message with id: {}", message_id))
720
                            })?;
721
                    Ok(vec![message])
722
                }
723
            } else {
724
                // export the entire mailing list
725
1
                let posts = self.list_posts(pk, None)?;
726
1
                Ok(posts)
727
            }
728
        };
729
1
        let mut buf: Vec<u8> = Vec::new();
730
1
        let mailbox = melib::mbox::MboxFormat::default();
731
3
        for post in posts? {
732
4
            let envelope_from = if let Some(address) = post.0.envelope_from {
733
                let address = melib::Address::try_from(address.as_str())?;
734
                Some(address)
735
            } else {
736
2
                None
737
            };
738
2
            let envelope = melib::Envelope::from_bytes(&post.0.message, None)?;
739
2
            mailbox.append(
740
                &mut buf,
741
2
                &post.0.message.to_vec(),
742
2
                envelope_from.as_ref(),
743
2
                Some(envelope.timestamp),
744
2
                (melib::Flag::PASSED, vec![]),
745
2
                melib::mbox::MboxMetadata::None,
746
                false,
747
                false,
748
2
            )?;
749
2
        }
750
2
        buf.flush()?;
751
1
        Ok(buf)
752
1
    }
753

            
754
    /// Fetch the owners of a mailing list.
755
31
    pub fn list_owners(&self, pk: i64) -> Result<Vec<DbVal<ListOwner>>> {
756
31
        let mut stmt = self
757
            .connection
758
            .prepare("SELECT * FROM owner WHERE list = ?;")?;
759
42
        let list_iter = stmt.query_map([&pk], |row| {
760
11
            let pk = row.get("pk")?;
761
11
            Ok(DbVal(
762
11
                ListOwner {
763
                    pk,
764
11
                    list: row.get("list")?,
765
11
                    address: row.get("address")?,
766
11
                    name: row.get("name")?,
767
                },
768
                pk,
769
            ))
770
11
        })?;
771

            
772
31
        let mut ret = vec![];
773
42
        for list in list_iter {
774
11
            let list = list?;
775
11
            ret.push(list);
776
        }
777
31
        Ok(ret)
778
31
    }
779

            
780
    /// Remove an owner of a mailing list.
781
3
    pub fn remove_list_owner(&self, list_pk: i64, owner_pk: i64) -> Result<()> {
782
6
        self.connection
783
            .query_row(
784
                "DELETE FROM owner WHERE list = ? AND pk = ? RETURNING *;",
785
3
                rusqlite::params![&list_pk, &owner_pk],
786
2
                |_| Ok(()),
787
            )
788
1
            .map_err(|err| {
789
1
                if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
790
                    Error::from(err).chain_err(|| NotFound("list or list owner not found!"))
791
                } else {
792
1
                    Error::from(err)
793
                }
794
2
            })?;
795
2
        Ok(())
796
3
    }
797

            
798
    /// Add an owner of a mailing list.
799
2
    pub fn add_list_owner(&self, list_owner: ListOwner) -> Result<DbVal<ListOwner>> {
800
2
        let mut stmt = self.connection.prepare(
801
            "INSERT OR REPLACE INTO owner(list, address, name) VALUES (?, ?, ?) RETURNING *;",
802
        )?;
803
2
        let list_pk = list_owner.list;
804
2
        let ret = stmt
805
            .query_row(
806
2
                rusqlite::params![&list_pk, &list_owner.address, &list_owner.name,],
807
2
                |row| {
808
2
                    let pk = row.get("pk")?;
809
2
                    Ok(DbVal(
810
2
                        ListOwner {
811
                            pk,
812
2
                            list: row.get("list")?,
813
2
                            address: row.get("address")?,
814
2
                            name: row.get("name")?,
815
                        },
816
                        pk,
817
                    ))
818
2
                },
819
            )
820
            .map_err(|err| {
821
                if matches!(
822
                    err,
823
                    rusqlite::Error::SqliteFailure(
824
                        rusqlite::ffi::Error {
825
                            code: rusqlite::ffi::ErrorCode::ConstraintViolation,
826
                            extended_code: 787
827
                        },
828
                        _
829
                    )
830
                ) {
831
                    Error::from(err).chain_err(|| NotFound("Could not find a list with this pk."))
832
                } else {
833
                    err.into()
834
                }
835
            })?;
836

            
837
2
        trace!("add_list_owner {:?}.", &ret);
838
2
        Ok(ret)
839
2
    }
840

            
841
    /// Update a mailing list.
842
1
    pub fn update_list(&self, change_set: MailingListChangeset) -> Result<()> {
843
2
        if matches!(
844
1
            change_set,
845
            MailingListChangeset {
846
                pk: _,
847
                name: None,
848
                id: None,
849
                address: None,
850
                description: None,
851
                archive_url: None,
852
                owner_local_part: None,
853
                request_local_part: None,
854
                verify: None,
855
                hidden: None,
856
                enabled: None,
857
            }
858
        ) {
859
            return self.list(change_set.pk).map(|_| ());
860
        }
861

            
862
        let MailingListChangeset {
863
1
            pk,
864
1
            name,
865
1
            id,
866
1
            address,
867
1
            description,
868
1
            archive_url,
869
1
            owner_local_part,
870
1
            request_local_part,
871
1
            verify,
872
1
            hidden,
873
1
            enabled,
874
        } = change_set;
875
2
        let tx = self.savepoint(Some(stringify!(update_list)))?;
876

            
877
        macro_rules! update {
878
            ($field:tt) => {{
879
3
                if let Some($field) = $field {
880
                    tx.connection.execute(
881
                        concat!("UPDATE list SET ", stringify!($field), " = ? WHERE pk = ?;"),
882
                        rusqlite::params![&$field, &pk],
883
                    )?;
884
                }
885
            }};
886
        }
887
1
        update!(name);
888
1
        update!(id);
889
1
        update!(address);
890
1
        update!(description);
891
1
        update!(archive_url);
892
1
        update!(owner_local_part);
893
1
        update!(request_local_part);
894
1
        update!(verify);
895
1
        update!(hidden);
896
1
        update!(enabled);
897

            
898
2
        tx.commit()?;
899
1
        Ok(())
900
1
    }
901

            
902
    /// Execute operations inside an SQL transaction.
903
12
    pub fn transaction(
904
        &'_ mut self,
905
        behavior: transaction::TransactionBehavior,
906
    ) -> Result<transaction::Transaction<'_>> {
907
        use transaction::*;
908

            
909
12
        let query = match behavior {
910
            TransactionBehavior::Deferred => "BEGIN DEFERRED",
911
7
            TransactionBehavior::Immediate => "BEGIN IMMEDIATE",
912
5
            TransactionBehavior::Exclusive => "BEGIN EXCLUSIVE",
913
        };
914
24
        self.connection.execute_batch(query)?;
915
12
        Ok(Transaction {
916
            conn: self,
917
12
            drop_behavior: DropBehavior::Rollback,
918
        })
919
12
    }
920

            
921
    /// Execute operations inside an SQL savepoint.
922
25
    pub fn savepoint(&'_ self, name: Option<&'static str>) -> Result<transaction::Savepoint<'_>> {
923
        use std::sync::atomic::{AtomicUsize, Ordering};
924

            
925
        use transaction::*;
926
        static COUNTER: AtomicUsize = AtomicUsize::new(0);
927

            
928
25
        let name = name
929
            .map(Ok)
930
5
            .unwrap_or_else(|| Err(COUNTER.fetch_add(1, Ordering::Relaxed)));
931

            
932
25
        match name {
933
20
            Ok(ref n) => self.connection.execute_batch(&format!("SAVEPOINT {n}"))?,
934
25
            Err(ref i) => self.connection.execute_batch(&format!("SAVEPOINT _{i}"))?,
935
        };
936

            
937
25
        Ok(Savepoint {
938
            conn: self,
939
25
            drop_behavior: DropBehavior::Rollback,
940
25
            name,
941
            committed: false,
942
        })
943
25
    }
944
}
945

            
946
/// Execute operations inside an SQL transaction.
947
pub mod transaction {
948
    use super::*;
949

            
950
    /// A transaction handle.
951
    #[derive(Debug)]
952
    pub struct Transaction<'conn> {
953
        pub(super) conn: &'conn mut Connection,
954
        pub(super) drop_behavior: DropBehavior,
955
    }
956

            
957
    impl Drop for Transaction<'_> {
958
12
        fn drop(&mut self) {
959
12
            _ = self.finish_();
960
12
        }
961
    }
962

            
963
    impl Transaction<'_> {
964
        /// Commit and consume transaction.
965
8
        pub fn commit(mut self) -> Result<()> {
966
8
            self.commit_()
967
8
        }
968

            
969
9
        fn commit_(&mut self) -> Result<()> {
970
9
            self.conn.connection.execute_batch("COMMIT")?;
971
9
            Ok(())
972
9
        }
973

            
974
        /// Configure the transaction to perform the specified action when it is
975
        /// dropped.
976
        #[inline]
977
3
        pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) {
978
3
            self.drop_behavior = drop_behavior;
979
6
        }
980

            
981
        /// A convenience method which consumes and rolls back a transaction.
982
        #[inline]
983
2
        pub fn rollback(mut self) -> Result<()> {
984
2
            self.rollback_()
985
2
        }
986

            
987
3
        fn rollback_(&mut self) -> Result<()> {
988
3
            self.conn.connection.execute_batch("ROLLBACK")?;
989
3
            Ok(())
990
3
        }
991

            
992
        /// Consumes the transaction, committing or rolling back according to
993
        /// the current setting (see `drop_behavior`).
994
        ///
995
        /// Functionally equivalent to the `Drop` implementation, but allows
996
        /// callers to see any errors that occur.
997
        #[inline]
998
        pub fn finish(mut self) -> Result<()> {
999
            self.finish_()
        }
        #[inline]
12
        fn finish_(&mut self) -> Result<()> {
12
            if self.conn.connection.is_autocommit() {
10
                return Ok(());
            }
2
            match self.drop_behavior {
1
                DropBehavior::Commit => self.commit_().or_else(|_| self.rollback_()),
1
                DropBehavior::Rollback => self.rollback_(),
                DropBehavior::Ignore => Ok(()),
                DropBehavior::Panic => panic!("Transaction dropped unexpectedly."),
            }
22
        }
    }
    impl std::ops::Deref for Transaction<'_> {
        type Target = Connection;
        #[inline]
22
        fn deref(&self) -> &Connection {
22
            self.conn
44
        }
    }
    /// Options for transaction behavior. See [BEGIN
    /// TRANSACTION](http://www.sqlite.org/lang_transaction.html) for details.
14
    #[derive(Copy, Clone, Default)]
    #[non_exhaustive]
    pub enum TransactionBehavior {
        /// DEFERRED means that the transaction does not actually start until
        /// the database is first accessed.
        Deferred,
        #[default]
        /// IMMEDIATE cause the database connection to start a new write
        /// immediately, without waiting for a writes statement.
7
        Immediate,
        /// EXCLUSIVE prevents other database connections from reading the
        /// database while the transaction is underway.
        Exclusive,
    }
    /// Options for how a Transaction or Savepoint should behave when it is
    /// dropped.
    #[derive(Default, Copy, Clone, Debug, PartialEq, Eq)]
    #[non_exhaustive]
    pub enum DropBehavior {
        #[default]
        /// Roll back the changes. This is the default.
        Rollback,
        /// Commit the changes.
        Commit,
        /// Do not commit or roll back changes - this will leave the transaction
        /// or savepoint open, so should be used with care.
        Ignore,
        /// Panic. Used to enforce intentional behavior during development.
        Panic,
    }
    /// A savepoint handle.
    #[derive(Debug)]
    pub struct Savepoint<'conn> {
        pub(super) conn: &'conn Connection,
        pub(super) drop_behavior: DropBehavior,
        pub(super) name: std::result::Result<&'static str, usize>,
        pub(super) committed: bool,
    }
    impl Drop for Savepoint<'_> {
25
        fn drop(&mut self) {
25
            _ = self.finish_();
25
        }
    }
    impl Savepoint<'_> {
        /// Commit and consume savepoint.
24
        pub fn commit(mut self) -> Result<()> {
24
            self.commit_()
24
        }
24
        fn commit_(&mut self) -> Result<()> {
48
            if !self.committed {
24
                match self.name {
40
                    Ok(ref n) => self
                        .conn
                        .connection
20
                        .execute_batch(&format!("RELEASE SAVEPOINT {n}"))?,
32
                    Err(ref i) => self
                        .conn
                        .connection
8
                        .execute_batch(&format!("RELEASE SAVEPOINT _{i}"))?,
                };
24
                self.committed = true;
            }
24
            Ok(())
24
        }
        /// Configure the savepoint to perform the specified action when it is
        /// dropped.
        #[inline]
        pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) {
            self.drop_behavior = drop_behavior;
        }
        /// A convenience method which consumes and rolls back a savepoint.
        #[inline]
1
        pub fn rollback(mut self) -> Result<()> {
1
            self.rollback_()
1
        }
11
        fn rollback_(&mut self) -> Result<()> {
11
            if !self.committed {
2
                match self.name {
                    Ok(ref n) => self
                        .conn
                        .connection
                        .execute_batch(&format!("ROLLBACK TO SAVEPOINT {n}"))?,
15
                    Err(ref i) => self
                        .conn
                        .connection
4
                        .execute_batch(&format!("ROLLBACK TO SAVEPOINT _{i}"))?,
                };
            }
11
            Ok(())
11
        }
        /// Consumes the savepoint, committing or rolling back according to
        /// the current setting (see `drop_behavior`).
        ///
        /// Functionally equivalent to the `Drop` implementation, but allows
        /// callers to see any errors that occur.
        #[inline]
        pub fn finish(mut self) -> Result<()> {
            self.finish_()
        }
        #[inline]
25
        fn finish_(&mut self) -> Result<()> {
25
            if self.conn.connection.is_autocommit() {
15
                return Ok(());
            }
10
            match self.drop_behavior {
                DropBehavior::Commit => self.commit_().or_else(|_| self.rollback_()),
10
                DropBehavior::Rollback => self.rollback_(),
                DropBehavior::Ignore => Ok(()),
                DropBehavior::Panic => panic!("Savepoint dropped unexpectedly."),
            }
40
        }
    }
    impl std::ops::Deref for Savepoint<'_> {
        type Target = Connection;
        #[inline]
63
        fn deref(&self) -> &Connection {
63
            self.conn
126
        }
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    #[test]
2
    fn test_new_connection() {
        use melib::smtp::{SmtpAuth, SmtpSecurity, SmtpServerConf};
        use tempfile::TempDir;
        use crate::SendMail;
1
        let tmp_dir = TempDir::new().unwrap();
1
        let db_path = tmp_dir.path().join("mpot.db");
1
        let data_path = tmp_dir.path().to_path_buf();
1
        let config = Configuration {
1
            send_mail: SendMail::Smtp(SmtpServerConf {
1
                hostname: "127.0.0.1".into(),
                port: 25,
1
                envelope_from: "foo-chat@example.com".into(),
1
                auth: SmtpAuth::None,
1
                security: SmtpSecurity::None,
1
                extensions: Default::default(),
            }),
            db_path,
            data_path,
1
            administrators: vec![],
        };
1
        assert_eq!(
1
            &Connection::open_db(config.clone()).unwrap_err().to_string(),
            "Database doesn't exist"
        );
1
        _ = Connection::open_or_create_db(config).unwrap();
2
    }
    #[test]
2
    fn test_transactions() {
        use melib::smtp::{SmtpAuth, SmtpSecurity, SmtpServerConf};
        use tempfile::TempDir;
        use super::transaction::*;
        use crate::SendMail;
1
        let tmp_dir = TempDir::new().unwrap();
1
        let db_path = tmp_dir.path().join("mpot.db");
1
        let data_path = tmp_dir.path().to_path_buf();
1
        let config = Configuration {
1
            send_mail: SendMail::Smtp(SmtpServerConf {
1
                hostname: "127.0.0.1".into(),
                port: 25,
1
                envelope_from: "foo-chat@example.com".into(),
1
                auth: SmtpAuth::None,
1
                security: SmtpSecurity::None,
1
                extensions: Default::default(),
            }),
            db_path,
            data_path,
1
            administrators: vec![],
        };
1
        let list = MailingList {
            pk: 0,
1
            name: "".into(),
1
            id: "".into(),
1
            description: None,
1
            topics: vec![],
1
            address: "".into(),
1
            archive_url: None,
        };
1
        let mut db = Connection::open_or_create_db(config).unwrap().trusted();
        /* drop rollback */
1
        let mut tx = db.transaction(Default::default()).unwrap();
1
        tx.set_drop_behavior(DropBehavior::Rollback);
1
        let _new = tx.create_list(list.clone()).unwrap();
1
        drop(tx);
1
        assert_eq!(&db.lists().unwrap(), &[]);
        /* drop commit */
1
        let mut tx = db.transaction(Default::default()).unwrap();
1
        tx.set_drop_behavior(DropBehavior::Commit);
1
        let new = tx.create_list(list.clone()).unwrap();
1
        drop(tx);
1
        assert_eq!(&db.lists().unwrap(), &[new.clone()]);
        /* rollback with drop commit */
1
        let mut tx = db.transaction(Default::default()).unwrap();
1
        tx.set_drop_behavior(DropBehavior::Commit);
2
        let _new2 = tx
1
            .create_list(MailingList {
1
                id: "1".into(),
1
                address: "1".into(),
1
                ..list.clone()
            })
1
            .unwrap();
1
        tx.rollback().unwrap();
1
        assert_eq!(&db.lists().unwrap(), &[new.clone()]);
        /* tx and then savepoint */
1
        let tx = db.transaction(Default::default()).unwrap();
1
        let sv = tx.savepoint(None).unwrap();
2
        let new2 = sv
1
            .create_list(MailingList {
1
                id: "2".into(),
1
                address: "2".into(),
1
                ..list.clone()
            })
1
            .unwrap();
1
        sv.commit().unwrap();
1
        tx.commit().unwrap();
1
        assert_eq!(&db.lists().unwrap(), &[new.clone(), new2.clone()]);
        /* tx and then rollback savepoint */
1
        let tx = db.transaction(Default::default()).unwrap();
1
        let sv = tx.savepoint(None).unwrap();
2
        let _new3 = sv
1
            .create_list(MailingList {
1
                id: "3".into(),
1
                address: "3".into(),
1
                ..list.clone()
            })
1
            .unwrap();
1
        sv.rollback().unwrap();
1
        tx.commit().unwrap();
1
        assert_eq!(&db.lists().unwrap(), &[new.clone(), new2.clone()]);
        /* tx, commit savepoint and then rollback commit */
1
        let tx = db.transaction(Default::default()).unwrap();
1
        let sv = tx.savepoint(None).unwrap();
2
        let _new3 = sv
1
            .create_list(MailingList {
1
                id: "3".into(),
1
                address: "3".into(),
1
                ..list.clone()
            })
1
            .unwrap();
1
        sv.commit().unwrap();
1
        tx.rollback().unwrap();
1
        assert_eq!(&db.lists().unwrap(), &[new.clone(), new2.clone()]);
        /* nested savepoints */
1
        let tx = db.transaction(Default::default()).unwrap();
1
        let sv = tx.savepoint(None).unwrap();
1
        let sv1 = sv.savepoint(None).unwrap();
2
        let new3 = sv1
1
            .create_list(MailingList {
1
                id: "3".into(),
1
                address: "3".into(),
                ..list
            })
            .unwrap();
1
        sv1.commit().unwrap();
1
        sv.commit().unwrap();
1
        tx.commit().unwrap();
1
        assert_eq!(&db.lists().unwrap(), &[new, new2, new3]);
2
    }
    #[test]
2
    fn test_mbox_export() {
        use tempfile::TempDir;
        use crate::SendMail;
1
        let tmp_dir = TempDir::new().unwrap();
1
        let db_path = tmp_dir.path().join("mpot.db");
1
        let data_path = tmp_dir.path().to_path_buf();
1
        let config = Configuration {
1
            send_mail: SendMail::ShellCommand("/usr/bin/false".to_string()),
            db_path,
            data_path,
1
            administrators: vec![],
        };
1
        let list = MailingList {
            pk: 0,
1
            name: "test".into(),
1
            id: "test".into(),
1
            description: None,
1
            topics: vec![],
1
            address: "test@example.com".into(),
1
            archive_url: None,
        };
1
        let test_emails = vec![
            r#"From: "User Name" <user@example.com>
To: "test" <test@example.com>
Subject: Hello World
Hello, this is a message.
Goodbye!
"#,
            r#"From: "User Name" <user@example.com>
To: "test" <test@example.com>
Subject: Fuu Bar
Baz,
Qux!
"#,
        ];
1
        let db = Connection::open_or_create_db(config).unwrap().trusted();
1
        db.create_list(list).unwrap();
3
        for email in test_emails {
4
            let envelope = melib::Envelope::from_bytes(email.as_bytes(), None).unwrap();
2
            db.post(&envelope, email.as_bytes(), false).unwrap();
2
        }
1
        let mbox = String::from_utf8(db.export_mbox(1, None, false).unwrap()).unwrap();
        assert!(
38
            mbox.split('\n').fold(0, |accm, line| {
37
                if line.starts_with("From MAILER-DAEMON") {
2
                    accm + 1
                } else {
35
                    accm
                }
37
            }) == 2
        )
2
    }
}