From e308bae173a78f7a6a5111268d331e83affc71bc Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 17:49:16 +0100 Subject: [PATCH 01/12] Using dynamic Sotrage --- ...95660c56e852770a4eac47576089e704322a.json} | 6 +- ...b712359553bc295db5689f254f623d172326.json} | 6 +- Cargo.lock | 1 + backend/Cargo.toml | 3 + backend/backend_config.toml | 30 +++-- .../20240929230957_v1_worker_results.up.sql | 2 +- backend/src/config.rs | 113 +++++++++++------- backend/src/db.rs | 39 ------ backend/src/lib.rs | 4 +- backend/src/storage/error.rs | 27 +++++ backend/src/storage/mod.rs | 32 +++++ backend/src/storage/postgres.rs | 100 ++++++++++++++++ backend/src/worker/do_work.rs | 31 +++-- backend/src/worker/response.rs | 61 +--------- 14 files changed, 274 insertions(+), 181 deletions(-) rename .sqlx/{query-f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1.json => query-068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a.json} (54%) rename .sqlx/{query-aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2.json => query-de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326.json} (54%) delete mode 100644 backend/src/db.rs create mode 100644 backend/src/storage/error.rs create mode 100644 backend/src/storage/mod.rs create mode 100644 backend/src/storage/postgres.rs diff --git a/.sqlx/query-f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1.json b/.sqlx/query-068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a.json similarity index 54% rename from .sqlx/query-f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1.json rename to .sqlx/query-068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a.json index abcac7ded..aae6fd4e4 100644 --- a/.sqlx/query-f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1.json +++ b/.sqlx/query-068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n\t\t\t\tINSERT INTO v1_task_result (payload, job_id, backend_name, error)\n\t\t\t\tVALUES ($1, $2, $3, $4)\n\t\t\t\tRETURNING id\n\t\t\t\t", + "query": "\n\t\t\t\t\tINSERT INTO v1_task_result (payload, job_id, extra, error)\n\t\t\t\t\tVALUES ($1, $2, $3, $4)\n\t\t\t\t\tRETURNING id\n\t\t\t\t\t", "describe": { "columns": [ { @@ -13,7 +13,7 @@ "Left": [ "Jsonb", "Int4", - "Text", + "Jsonb", "Text" ] }, @@ -21,5 +21,5 @@ false ] }, - "hash": "f7ac211d1216f1c0cecc80074a7307b69ee10ca90a712a6e38175c898b8e06b1" + "hash": "068521cf9e77f563b3791cce500d95660c56e852770a4eac47576089e704322a" } diff --git a/.sqlx/query-aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2.json b/.sqlx/query-de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326.json similarity index 54% rename from .sqlx/query-aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2.json rename to .sqlx/query-de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326.json index b8242e060..2de7122e0 100644 --- a/.sqlx/query-aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2.json +++ b/.sqlx/query-de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n\t\t\t\tINSERT INTO v1_task_result (payload, job_id, backend_name, result)\n\t\t\t\tVALUES ($1, $2, $3, $4)\n\t\t\t\tRETURNING id\n\t\t\t\t", + "query": "\n\t\t\t\t\tINSERT INTO v1_task_result (payload, job_id, extra, result)\n\t\t\t\t\tVALUES ($1, $2, $3, $4)\n\t\t\t\t\tRETURNING id\n\t\t\t\t\t", "describe": { "columns": [ { @@ -13,7 +13,7 @@ "Left": [ "Jsonb", "Int4", - "Text", + "Jsonb", "Jsonb" ] }, @@ -21,5 +21,5 @@ false ] }, - "hash": "aaa1cf5961de24b1fb68f856205afe4d3bfaf55e3371f46a472ea3d5705d35b2" + "hash": "de8a3af8119e17c38b2f60cafac3b712359553bc295db5689f254f623d172326" } diff --git a/Cargo.lock b/Cargo.lock index ad6648306..7057289dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2975,6 +2975,7 @@ dependencies = [ "tokio", "tokio-executor-trait", "tokio-reactor-trait", + "toml", "tracing", "tracing-subscriber", "uuid 1.10.0", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 7ce3794a0..ad2a4272d 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -37,6 +37,9 @@ tracing-subscriber = "0.3.18" uuid = "1.10" warp = "0.3" +[dev-dependencies] +toml = "0.8" + [features] worker = [ "futures", diff --git a/backend/backend_config.toml b/backend/backend_config.toml index b4f6fbaf9..6ec1be46d 100644 --- a/backend/backend_config.toml +++ b/backend/backend_config.toml @@ -44,6 +44,11 @@ webdriver_addr = "http://localhost:9515" # Env variable: RCH__SMTP_TIMEOUT # smtp_timeout = 45 +# Optional Sentry DSN. If set, all errors will be sent to Sentry. +# +# Env variable: RCH__SENTRY_DSN +# sentry_dsn = "" + # Uncomment the lines below to route all SMTP verification requests # through a specified proxy. Note that the proxy must be a SOCKS5 proxy to work # with the SMTP protocol. This proxy will not be used for headless @@ -119,14 +124,19 @@ concurrency = 5 # max_requests_per_hour = 1000 # max_requests_per_day = 20000 -# Postgres configuration. Currently, a Postgres database is required to store -# the results of the verifications. This might change in the future, allowing -# for pluggable storage. -[worker.postgres] -# Env variable: RCH__WORKER__POSTGRES__DB_URL +# Below are the configurations for the storage of the email verification +# results. We currently support the following storage backends: +# - Postgres +# +# You can configure one or more storages. The format of the configuration is: +# [storage..] +# where: +# - is a unique identifier for the storage, in case you have +# multiple storages of the same type. You can use 0, 1, 2, etc. as the +# identifier. +# - is the type of the storage, e.g. "postgres". + +# Postgres configuration. +[storage.0.postgres] +# Env variable: RCH__STORAGE__0__POSTGRES__DB_URL db_url = "postgresql://localhost/reacherdb" - -# Optional Sentry DSN. If set, all errors will be sent to Sentry. -# -# Env variable: RCH__SENTRY_DSN -# sentry_dsn = "" diff --git a/backend/migrations/20240929230957_v1_worker_results.up.sql b/backend/migrations/20240929230957_v1_worker_results.up.sql index d055b77cf..6a0d91310 100644 --- a/backend/migrations/20240929230957_v1_worker_results.up.sql +++ b/backend/migrations/20240929230957_v1_worker_results.up.sql @@ -8,7 +8,7 @@ CREATE TABLE v1_task_result ( id SERIAL PRIMARY KEY, job_id INTEGER NOT NULL REFERENCES v1_bulk_job(id) ON DELETE CASCADE, payload JSONB NOT NULL, - backend_name TEXT NOT NULL, + extra JSONB, -- any extra data that needs to be stored result JSONB, error TEXT, created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL diff --git a/backend/src/config.rs b/backend/src/config.rs index c1edc8228..3eba41aa0 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -14,12 +14,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use crate::create_db; +use crate::storage::{postgres::PostgresStorage, Storage}; #[cfg(feature = "worker")] use crate::worker::do_work::TaskWebhook; #[cfg(feature = "worker")] use crate::worker::setup_rabbit_mq; -use anyhow::bail; +use anyhow::{bail, Context}; use check_if_email_exists::{ CheckEmailInputProxy, GmailVerifMethod, HotmailB2BVerifMethod, HotmailB2CVerifMethod, YahooVerifMethod, LOG_TARGET, @@ -29,9 +29,9 @@ use config::Config; use lapin::Channel; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use std::env; #[cfg(feature = "worker")] use std::sync::Arc; +use std::{collections::HashMap, env}; use tracing::warn; #[derive(Debug, Default, Serialize, Deserialize)] @@ -64,12 +64,16 @@ pub struct BackendConfig { /// Worker configuration, only present if the backend is a worker. pub worker: WorkerConfig, + /// Configuration on where to store the email verification results. + pub storage: HashMap, + // Internal fields, not part of the configuration. - #[serde(skip)] - pg_pool: Option, #[cfg(feature = "worker")] #[serde(skip)] channel: Option>, + + #[serde(skip)] + storages: Vec>, } impl BackendConfig { @@ -83,31 +87,20 @@ impl BackendConfig { self.worker.enable, &self.worker.throttle, &self.worker.rabbitmq, - &self.worker.postgres, - &self.pg_pool, #[cfg(feature = "worker")] &self.channel, ) { #[cfg(feature = "worker")] - ( - true, - Some(throttle), - Some(rabbitmq), - Some(postgres), - Some(pg_pool), - Some(channel), - ) => Ok(MustWorkerConfig { - pg_pool: pg_pool.clone(), + (true, Some(throttle), Some(rabbitmq), Some(channel)) => Ok(MustWorkerConfig { #[cfg(feature = "worker")] channel: channel.clone(), throttle: throttle.clone(), rabbitmq: rabbitmq.clone(), #[cfg(feature = "worker")] webhook: self.worker.webhook.clone(), - postgres: postgres.clone(), }), #[cfg(feature = "worker")] - (true, _, _, _, _, _) => bail!("Worker configuration is missing"), + (true, _, _, _) => bail!("Worker configuration is missing"), _ => bail!("Calling must_worker_config on a non-worker backend"), } } @@ -115,23 +108,16 @@ impl BackendConfig { /// Attempt connection to the Postgres database and RabbitMQ. Also populates /// the internal `pg_pool` and `channel` fields with the connections. pub async fn connect(&mut self) -> Result<(), anyhow::Error> { - let pg_pool = if self.worker.enable { - let db_url = self - .worker - .postgres - .as_ref() - .map(|c| &c.db_url) - .ok_or_else(|| { - anyhow::anyhow!("Worker configuration is missing the postgres configuration") - })?; - Some(create_db(db_url).await?) - } else if let Ok(db_url) = env::var("DATABASE_URL") { - // For legacy reasons, we also support the DATABASE_URL environment variable: - Some(create_db(&db_url).await?) - } else { - None - }; - self.pg_pool = pg_pool; + for (_, storage) in &self.storage { + match storage { + StorageConfig::Postgres(config) => { + let storage = PostgresStorage::new(&config.db_url) + .await + .with_context(|| format!("Connecting to postgres DB {}", config.db_url))?; + self.storages.push(Box::new(storage)); + } + } + } #[cfg(feature = "worker")] { @@ -177,16 +163,12 @@ pub struct WorkerConfig { /// Optional webhook configuration to send email verification results. #[cfg(feature = "worker")] pub webhook: Option, - /// Postgres database configuration to store email verification - /// results. - pub postgres: Option, } /// Worker configuration that must be present if worker.enable is true. Used as /// a domain type to ensure that the worker configuration is present. #[derive(Debug, Clone)] pub struct MustWorkerConfig { - pub pg_pool: PgPool, #[cfg(feature = "worker")] pub channel: Arc, @@ -194,7 +176,6 @@ pub struct MustWorkerConfig { pub rabbitmq: RabbitMQConfig, #[cfg(feature = "worker")] pub webhook: Option, - pub postgres: PostgresConfig, } #[derive(Debug, Deserialize, Clone, Serialize)] @@ -204,11 +185,6 @@ pub struct RabbitMQConfig { pub concurrency: u16, } -#[derive(Debug, Deserialize, Clone, Serialize)] -pub struct PostgresConfig { - pub db_url: String, -} - #[derive(Debug, Deserialize, Clone, Serialize)] pub struct ThrottleConfig { pub max_requests_per_second: Option, @@ -229,6 +205,18 @@ impl ThrottleConfig { } } +#[derive(Debug, Deserialize, Clone, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum StorageConfig { + /// Store the email verification results in the Postgres database. + Postgres(PostgresConfig), +} + +#[derive(Debug, Deserialize, Clone, PartialEq, Serialize)] +pub struct PostgresConfig { + pub db_url: String, +} + /// Load the worker configuration from the worker_config.toml file and from the /// environment. pub async fn load_config() -> Result { @@ -247,13 +235,46 @@ pub async fn load_config() -> Result { #[cfg(test)] mod tests { - use super::load_config; + use super::*; use std::env; #[tokio::test] async fn test_env_vars() { env::set_var("RCH__BACKEND_NAME", "test-backend"); + env::set_var("RCH__STORAGE__1__POSTGRES__DB_URL", "test2"); let cfg = load_config().await.unwrap(); assert_eq!(cfg.backend_name, "test-backend"); + assert_eq!( + cfg.storage.get("1").unwrap(), + &StorageConfig::Postgres(PostgresConfig { + db_url: "test2".to_string() + }) + ); + } + + #[tokio::test] + async fn test_serialize_storage_config() { + let mut storage_config = HashMap::new(); + storage_config.insert( + "test1", + StorageConfig::Postgres(PostgresConfig { + db_url: "postgres://localhost:5432/test1".to_string(), + }), + ); + storage_config.insert( + "test2", + StorageConfig::Postgres(PostgresConfig { + db_url: "postgres://localhost:5432/test2".to_string(), + }), + ); + + let expected = r#"[test2.postgres] +db_url = "postgres://localhost:5432/test2" + +[test1.postgres] +db_url = "postgres://localhost:5432/test1" +"#; + + assert_eq!(expected, toml::to_string(&storage_config).unwrap(),); } } diff --git a/backend/src/db.rs b/backend/src/db.rs deleted file mode 100644 index c260675bc..000000000 --- a/backend/src/db.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use anyhow::Context; -use check_if_email_exists::LOG_TARGET; -use sqlx::{postgres::PgPoolOptions, PgPool}; - -use tracing::{debug, info}; - -/// Create a DB pool. -pub async fn create_db(db_url: &str) -> Result { - debug!(target: LOG_TARGET, "Connecting to DB: {}", db_url); - // create connection pool with database - // connection pool internally the shared db connection - // with arc so it can safely be cloned and shared across threads - let pool = PgPoolOptions::new() - .connect(db_url) - .await - .with_context(|| format!("Connecting to postgres DB {db_url}"))?; - - sqlx::migrate!("./migrations").run(&pool).await?; - - info!(target: LOG_TARGET, table="v1_task_result", "Connected to DB, Reacher will write verification results to DB"); - - Ok(pool) -} diff --git a/backend/src/lib.rs b/backend/src/lib.rs index b6ecc9cae..5b5afad63 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -15,11 +15,9 @@ // along with this program. If not, see . pub mod config; -mod db; pub mod http; +mod storage; #[cfg(feature = "worker")] pub mod worker; -pub use db::create_db; - const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/backend/src/storage/error.rs b/backend/src/storage/error.rs new file mode 100644 index 000000000..d88f4d4b4 --- /dev/null +++ b/backend/src/storage/error.rs @@ -0,0 +1,27 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum StorageError { + #[error("SQLX error: {0}")] + SqlxError(#[from] sqlx::error::Error), + #[error("SQLX migrate error: {0}")] + MigrateError(#[from] sqlx::migrate::MigrateError), + #[error("Serde JSON error: {0}")] + SerdeJsonError(#[from] serde_json::Error), +} diff --git a/backend/src/storage/mod.rs b/backend/src/storage/mod.rs new file mode 100644 index 000000000..22bd5f6b2 --- /dev/null +++ b/backend/src/storage/mod.rs @@ -0,0 +1,32 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +pub mod error; +pub mod postgres; + +use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; +use check_if_email_exists::CheckEmailOutput; +use error::StorageError; +use std::fmt::Debug; + +pub trait Storage: Debug + Send + Sync { + async fn store( + &self, + task: &CheckEmailTask, + worker_output: &Result, + extra: Option, + ) -> Result<(), StorageError>; +} diff --git a/backend/src/storage/postgres.rs b/backend/src/storage/postgres.rs new file mode 100644 index 000000000..1b539171c --- /dev/null +++ b/backend/src/storage/postgres.rs @@ -0,0 +1,100 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use super::error::StorageError; +use super::Storage; +use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; +use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; +use sqlx::postgres::PgPoolOptions; +use sqlx::PgPool; +use tracing::{debug, info}; + +#[derive(Debug)] +pub struct PostgresStorage { + pg_pool: PgPool, +} + +impl PostgresStorage { + pub async fn new(db_url: &str) -> Result { + debug!(target: LOG_TARGET, "Connecting to DB: {}", db_url); + // create connection pool with database + // connection pool internally the shared db connection + // with arc so it can safely be cloned and shared across threads + let pg_pool = PgPoolOptions::new().connect(db_url).await?; + + sqlx::migrate!("./migrations").run(&pg_pool).await?; + + info!(target: LOG_TARGET, table="v1_task_result", "Connected to DB, Reacher will write verification results to DB"); + + Ok(Self { pg_pool }) + } +} + +impl Storage for PostgresStorage { + async fn store( + &self, + task: &CheckEmailTask, + worker_output: &Result, + extra: Option, + ) -> Result<(), StorageError> { + let payload_json = serde_json::to_value(task)?; + + match worker_output { + Ok(output) => { + let output_json = serde_json::to_value(output)?; + + sqlx::query!( + r#" + INSERT INTO v1_task_result (payload, job_id, extra, result) + VALUES ($1, $2, $3, $4) + RETURNING id + "#, + payload_json, + match task.job_id { + CheckEmailJobId::Bulk(job_id) => job_id, + CheckEmailJobId::SingleShot => 0, + }, + extra, + output_json, + ) + .fetch_one(&self.pg_pool) + .await?; + } + Err(err) => { + sqlx::query!( + r#" + INSERT INTO v1_task_result (payload, job_id, extra, error) + VALUES ($1, $2, $3, $4) + RETURNING id + "#, + payload_json, + match task.job_id { + CheckEmailJobId::Bulk(job_id) => job_id, + CheckEmailJobId::SingleShot => 0, + }, + extra, + err.to_string(), + ) + .fetch_one(&self.pg_pool) + .await?; + } + } + + debug!(target: LOG_TARGET, email=?task.input.to_email, "Wrote to DB"); + + Ok(()) + } +} diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index 5cc99b078..04d13ea54 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use super::response::save_to_db; use crate::config::BackendConfig; use crate::worker::response::send_single_shot_reply; use check_if_email_exists::{ @@ -141,21 +140,21 @@ pub(crate) async fn do_check_email_work( // - If it's a bulk verification, we save the result to the database. delivery.ack(BasicAckOptions::default()).await?; - match task.job_id { - CheckEmailJobId::SingleShot => { - send_single_shot_reply(channel, &delivery, &worker_output).await?; - } - CheckEmailJobId::Bulk(bulk_job_id) => { - save_to_db( - &config.backend_name, - config.get_pg_pool(), - task, - bulk_job_id, - &worker_output, - ) - .await?; - } - } + // match task.job_id { + // CheckEmailJobId::SingleShot => { + // send_single_shot_reply(channel, &delivery, &worker_output).await?; + // } + // CheckEmailJobId::Bulk(bulk_job_id) => { + // save_to_db( + // &config.backend_name, + // config.get_pg_pool(), + // task, + // bulk_job_id, + // &worker_output, + // ) + // .await?; + // } + // } info!(target: LOG_TARGET, email=task.input.to_email, diff --git a/backend/src/worker/response.rs b/backend/src/worker/response.rs index 0d1d73d4a..1b10c5cf2 100644 --- a/backend/src/worker/response.rs +++ b/backend/src/worker/response.rs @@ -14,77 +14,18 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use super::do_work::{CheckEmailTask, TaskError}; +use super::do_work::TaskError; use anyhow::bail; use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; use lapin::message::Delivery; use lapin::options::BasicPublishOptions; use lapin::{BasicProperties, Channel}; use serde::{Deserialize, Serialize}; -use sqlx::PgPool; use std::convert::TryFrom; use std::sync::Arc; use tracing::debug; use warp::http::StatusCode; -/// Save the task result to the database. This only happens if the task is a -/// part of a bulk verification job. If no pool is provided, the function will -/// simply return without doing anything. -/// -/// # Panics -/// -/// Panics if the task is a single-shot task, i.e. if `payload.job_id` is `None`. -pub async fn save_to_db( - backend_name: &str, - pg_pool: Option, - task: &CheckEmailTask, - bulk_job_id: i32, - worker_output: &Result, -) -> Result<(), anyhow::Error> { - let pg_pool = pg_pool.ok_or_else(|| anyhow::anyhow!("No DB pool provided"))?; - - let payload_json = serde_json::to_value(task)?; - - match worker_output { - Ok(output) => { - let output_json = serde_json::to_value(output)?; - - sqlx::query!( - r#" - INSERT INTO v1_task_result (payload, job_id, backend_name, result) - VALUES ($1, $2, $3, $4) - RETURNING id - "#, - payload_json, - bulk_job_id, - backend_name, - output_json, - ) - .fetch_one(&pg_pool) - .await?; - } - Err(err) => { - sqlx::query!( - r#" - INSERT INTO v1_task_result (payload, job_id, backend_name, error) - VALUES ($1, $2, $3, $4) - RETURNING id - "#, - payload_json, - bulk_job_id, - backend_name, - err.to_string(), - ) - .fetch_one(&pg_pool) - .await?; - } - } - - debug!(target: LOG_TARGET, email=?task.input.to_email, "Wrote to DB"); - - Ok(()) -} - /// For single-shot email verifications, the worker will send a reply to the /// client with the result of the verification. Since both CheckEmailOutput and /// TaskError are not Deserialize, we need to create a new struct that can be From 372e3bcc80b265bd8277c5d39e5a08fa262e1a1c Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 18:16:42 +0100 Subject: [PATCH 02/12] Make it build --- Cargo.lock | 1 + backend/Cargo.toml | 1 + backend/src/config.rs | 30 ++++++++++++++++++++++++------ backend/src/storage/mod.rs | 9 +++++++-- backend/src/storage/postgres.rs | 13 ++++++++++--- backend/src/worker/do_work.rs | 30 ++++++++++++++---------------- 6 files changed, 57 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7057289dd..f16fee02d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2958,6 +2958,7 @@ version = "0.10.0" dependencies = [ "anyhow", "async-smtp", + "async-trait", "check-if-email-exists", "config", "csv", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index ad2a4272d..85df6bb11 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -8,6 +8,7 @@ publish = false [dependencies] anyhow = "1.0" async-smtp = { version = "0.9.1", features = ["runtime-tokio"] } +async-trait = "0.1" check-if-email-exists = { path = "../core", features = ["sentry"] } config = "0.14" csv = "1.3.0" diff --git a/backend/src/config.rs b/backend/src/config.rs index 3eba41aa0..99460a7c2 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -31,7 +31,7 @@ use serde::{Deserialize, Serialize}; use sqlx::PgPool; #[cfg(feature = "worker")] use std::sync::Arc; -use std::{collections::HashMap, env}; +use std::{any::Any, collections::HashMap}; use tracing::warn; #[derive(Debug, Default, Serialize, Deserialize)] @@ -73,7 +73,7 @@ pub struct BackendConfig { channel: Option>, #[serde(skip)] - storages: Vec>, + storages: Vec>, } impl BackendConfig { @@ -111,10 +111,10 @@ impl BackendConfig { for (_, storage) in &self.storage { match storage { StorageConfig::Postgres(config) => { - let storage = PostgresStorage::new(&config.db_url) + let storage = PostgresStorage::new(&config.db_url, config.extra.clone()) .await .with_context(|| format!("Connecting to postgres DB {}", config.db_url))?; - self.storages.push(Box::new(storage)); + self.storages.push(Arc::new(storage)); } } } @@ -136,8 +136,22 @@ impl BackendConfig { Ok(()) } + /// Get all storages as a Vec. We don't really care about the keys in the + /// HashMap, except for deserialize purposes. + pub fn get_storages(&self) -> Vec> { + self.storages.clone() + } + + /// Get the Postgres connection pool, if at least one of the storages is a + /// Postgres storage. + /// + /// This is quite hacky, and it will most probably be refactored away in + /// future versions. We however need to rethink how to do the `/v1/bulk` + /// endpoints first. pub fn get_pg_pool(&self) -> Option { - self.pg_pool.clone() + self.storages + .iter() + .find_map(|s| ::downcast_ref::(s).map(|s| s.pg_pool.clone())) } } @@ -215,6 +229,7 @@ pub enum StorageConfig { #[derive(Debug, Deserialize, Clone, PartialEq, Serialize)] pub struct PostgresConfig { pub db_url: String, + pub extra: Option, } /// Load the worker configuration from the worker_config.toml file and from the @@ -247,7 +262,8 @@ mod tests { assert_eq!( cfg.storage.get("1").unwrap(), &StorageConfig::Postgres(PostgresConfig { - db_url: "test2".to_string() + db_url: "test2".to_string(), + extra: None, }) ); } @@ -259,12 +275,14 @@ mod tests { "test1", StorageConfig::Postgres(PostgresConfig { db_url: "postgres://localhost:5432/test1".to_string(), + extra: None, }), ); storage_config.insert( "test2", StorageConfig::Postgres(PostgresConfig { db_url: "postgres://localhost:5432/test2".to_string(), + extra: None, }), ); diff --git a/backend/src/storage/mod.rs b/backend/src/storage/mod.rs index 22bd5f6b2..7d6249f05 100644 --- a/backend/src/storage/mod.rs +++ b/backend/src/storage/mod.rs @@ -17,16 +17,21 @@ pub mod error; pub mod postgres; -use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; +use crate::worker::do_work::{CheckEmailTask, TaskError}; +use async_trait::async_trait; use check_if_email_exists::CheckEmailOutput; use error::StorageError; +use std::any::Any; use std::fmt::Debug; -pub trait Storage: Debug + Send + Sync { +#[async_trait] +pub trait Storage: Debug + Send + Sync + Any { async fn store( &self, task: &CheckEmailTask, worker_output: &Result, extra: Option, ) -> Result<(), StorageError>; + + fn get_extra(&self) -> Option; } diff --git a/backend/src/storage/postgres.rs b/backend/src/storage/postgres.rs index 1b539171c..60b2a4f48 100644 --- a/backend/src/storage/postgres.rs +++ b/backend/src/storage/postgres.rs @@ -17,6 +17,7 @@ use super::error::StorageError; use super::Storage; use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; +use async_trait::async_trait; use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -24,11 +25,12 @@ use tracing::{debug, info}; #[derive(Debug)] pub struct PostgresStorage { - pg_pool: PgPool, + pub pg_pool: PgPool, + extra: Option, } impl PostgresStorage { - pub async fn new(db_url: &str) -> Result { + pub async fn new(db_url: &str, extra: Option) -> Result { debug!(target: LOG_TARGET, "Connecting to DB: {}", db_url); // create connection pool with database // connection pool internally the shared db connection @@ -39,10 +41,11 @@ impl PostgresStorage { info!(target: LOG_TARGET, table="v1_task_result", "Connected to DB, Reacher will write verification results to DB"); - Ok(Self { pg_pool }) + Ok(Self { pg_pool, extra }) } } +#[async_trait] impl Storage for PostgresStorage { async fn store( &self, @@ -97,4 +100,8 @@ impl Storage for PostgresStorage { Ok(()) } + + fn get_extra(&self) -> Option { + self.extra.clone() + } } diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index 04d13ea54..e43d1646f 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -137,24 +137,22 @@ pub(crate) async fn do_check_email_work( _ => { // This is the happy path. We acknowledge the message and: // - If it's a single-shot email verification, we send a reply to the client. - // - If it's a bulk verification, we save the result to the database. + // - In any case, we store the result. delivery.ack(BasicAckOptions::default()).await?; - // match task.job_id { - // CheckEmailJobId::SingleShot => { - // send_single_shot_reply(channel, &delivery, &worker_output).await?; - // } - // CheckEmailJobId::Bulk(bulk_job_id) => { - // save_to_db( - // &config.backend_name, - // config.get_pg_pool(), - // task, - // bulk_job_id, - // &worker_output, - // ) - // .await?; - // } - // } + match task.job_id { + CheckEmailJobId::SingleShot => { + send_single_shot_reply(channel, &delivery, &worker_output).await?; + } + _ => {} + } + + // Store the result. + for storage in config.get_storages() { + storage + .store(task, &worker_output, storage.get_extra()) + .await?; + } info!(target: LOG_TARGET, email=task.input.to_email, From d9cfaf07ea35ce57c4cf9321f8d23ba7d93ce0b5 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 18:35:17 +0100 Subject: [PATCH 03/12] More comments --- backend/backend_config.toml | 18 +++++++++++++++--- .../20240929230957_v1_worker_results.up.sql | 2 +- backend/src/config.rs | 16 +++++----------- backend/src/storage/postgres.rs | 8 ++++---- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/backend/backend_config.toml b/backend/backend_config.toml index 6ec1be46d..d0f976ee6 100644 --- a/backend/backend_config.toml +++ b/backend/backend_config.toml @@ -135,8 +135,20 @@ concurrency = 5 # multiple storages of the same type. You can use 0, 1, 2, etc. as the # identifier. # - is the type of the storage, e.g. "postgres". +[storage] -# Postgres configuration. -[storage.0.postgres] +# Uncomment the following line to configure the 1st storage to use Postgres. +# [storage.0.postgres] + +# # URL to connect to the Postgres database. +# # Env variable: RCH__STORAGE__0__POSTGRES__DB_URL -db_url = "postgresql://localhost/reacherdb" +# db_url = "postgresql://localhost/reacherdb" +# +# If you wish to store additional data along with the verification results, +# you can add a JSON object to the "extra" field. This object will be stored +# as a JSONB column in the database. This is for example useful to track who +# initiated the verification request in a multi-tenant system. +# +# Env variable: RCH__STORAGE__0__POSTGRES__TABLE_NAME +# extra = {"my_custom_key": "my_custom_value"} diff --git a/backend/migrations/20240929230957_v1_worker_results.up.sql b/backend/migrations/20240929230957_v1_worker_results.up.sql index 6a0d91310..926437d40 100644 --- a/backend/migrations/20240929230957_v1_worker_results.up.sql +++ b/backend/migrations/20240929230957_v1_worker_results.up.sql @@ -6,7 +6,7 @@ CREATE TABLE v1_bulk_job ( CREATE TABLE v1_task_result ( id SERIAL PRIMARY KEY, - job_id INTEGER NOT NULL REFERENCES v1_bulk_job(id) ON DELETE CASCADE, + job_id INTEGER REFERENCES v1_bulk_job(id) ON DELETE CASCADE, payload JSONB NOT NULL, extra JSONB, -- any extra data that needs to be stored result JSONB, diff --git a/backend/src/config.rs b/backend/src/config.rs index 99460a7c2..4d722c85b 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -108,6 +108,10 @@ impl BackendConfig { /// Attempt connection to the Postgres database and RabbitMQ. Also populates /// the internal `pg_pool` and `channel` fields with the connections. pub async fn connect(&mut self) -> Result<(), anyhow::Error> { + if self.worker.enable && self.storage.is_empty() { + bail!("When worker.enable is true, you must configure at least one storage to store the email verification results."); + } + for (_, storage) in &self.storage { match storage { StorageConfig::Postgres(config) => { @@ -278,18 +282,8 @@ mod tests { extra: None, }), ); - storage_config.insert( - "test2", - StorageConfig::Postgres(PostgresConfig { - db_url: "postgres://localhost:5432/test2".to_string(), - extra: None, - }), - ); - - let expected = r#"[test2.postgres] -db_url = "postgres://localhost:5432/test2" -[test1.postgres] + let expected = r#"[test1.postgres] db_url = "postgres://localhost:5432/test1" "#; diff --git a/backend/src/storage/postgres.rs b/backend/src/storage/postgres.rs index 60b2a4f48..618c23776 100644 --- a/backend/src/storage/postgres.rs +++ b/backend/src/storage/postgres.rs @@ -67,8 +67,8 @@ impl Storage for PostgresStorage { "#, payload_json, match task.job_id { - CheckEmailJobId::Bulk(job_id) => job_id, - CheckEmailJobId::SingleShot => 0, + CheckEmailJobId::Bulk(job_id) => Some(job_id), + CheckEmailJobId::SingleShot => None, }, extra, output_json, @@ -85,8 +85,8 @@ impl Storage for PostgresStorage { "#, payload_json, match task.job_id { - CheckEmailJobId::Bulk(job_id) => job_id, - CheckEmailJobId::SingleShot => 0, + CheckEmailJobId::Bulk(job_id) => Some(job_id), + CheckEmailJobId::SingleShot => None, }, extra, err.to_string(), From 7ff201f572e54148e0e1a1ac85d6f8b1b96f6d98 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 18:36:02 +0100 Subject: [PATCH 04/12] ckippy --- backend/src/config.rs | 2 +- backend/src/worker/do_work.rs | 9 +++------ core/src/rules.rs | 6 ++---- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/backend/src/config.rs b/backend/src/config.rs index 4d722c85b..1a4f2dbce 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -112,7 +112,7 @@ impl BackendConfig { bail!("When worker.enable is true, you must configure at least one storage to store the email verification results."); } - for (_, storage) in &self.storage { + for storage in self.storage.values() { match storage { StorageConfig::Postgres(config) => { let storage = PostgresStorage::new(&config.db_url, config.extra.clone()) diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index e43d1646f..838b64378 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -140,12 +140,9 @@ pub(crate) async fn do_check_email_work( // - In any case, we store the result. delivery.ack(BasicAckOptions::default()).await?; - match task.job_id { - CheckEmailJobId::SingleShot => { - send_single_shot_reply(channel, &delivery, &worker_output).await?; - } - _ => {} - } + if let CheckEmailJobId::SingleShot = task.job_id { + send_single_shot_reply(channel, &delivery, &worker_output).await?; + } // Store the result. for storage in config.get_storages() { diff --git a/core/src/rules.rs b/core/src/rules.rs index 30c209126..866b45c8e 100644 --- a/core/src/rules.rs +++ b/core/src/rules.rs @@ -92,13 +92,11 @@ mod tests { #[test] fn should_skip_catch_all() { - assert_eq!( - true, + assert!( has_rule("gmail.com", "alt4.aspmx.l.google.com.", &Rule::SkipCatchAll) ); - assert_eq!( - true, + assert!( has_rule("domain.com", ".antispamcloud.com.", &Rule::SkipCatchAll) ) } From 8f35e96a859feb027d00940c1191845364bf8f00 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 22:15:34 +0100 Subject: [PATCH 05/12] clippy --- backend/src/worker/do_work.rs | 4 ++-- core/src/rules.rs | 18 +++++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index 838b64378..4c530b620 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -141,8 +141,8 @@ pub(crate) async fn do_check_email_work( delivery.ack(BasicAckOptions::default()).await?; if let CheckEmailJobId::SingleShot = task.job_id { - send_single_shot_reply(channel, &delivery, &worker_output).await?; - } + send_single_shot_reply(channel, &delivery, &worker_output).await?; + } // Store the result. for storage in config.get_storages() { diff --git a/core/src/rules.rs b/core/src/rules.rs index 866b45c8e..88d76b4ef 100644 --- a/core/src/rules.rs +++ b/core/src/rules.rs @@ -92,12 +92,16 @@ mod tests { #[test] fn should_skip_catch_all() { - assert!( - has_rule("gmail.com", "alt4.aspmx.l.google.com.", &Rule::SkipCatchAll) - ); - - assert!( - has_rule("domain.com", ".antispamcloud.com.", &Rule::SkipCatchAll) - ) + assert!(has_rule( + "gmail.com", + "alt4.aspmx.l.google.com.", + &Rule::SkipCatchAll + )); + + assert!(has_rule( + "domain.com", + ".antispamcloud.com.", + &Rule::SkipCatchAll + )) } } From f90024b6888f8ad2da72ef51a284fffdfcbbdc19 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 22:24:09 +0100 Subject: [PATCH 06/12] remove worker feature --- .vscode/settings.json | 3 --- backend/Cargo.toml | 19 +++++-------------- backend/Dockerfile | 2 +- backend/README.md | 2 +- backend/src/config.rs | 38 ++++++++++++-------------------------- backend/src/http/error.rs | 1 - backend/src/http/mod.rs | 29 ++++++++++------------------- backend/src/lib.rs | 1 - backend/src/main.rs | 27 +++++++++------------------ 9 files changed, 38 insertions(+), 84 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 2decb065a..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "rust-analyzer.cargo.features": "all" -} diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 85df6bb11..dd63682b4 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -13,12 +13,12 @@ check-if-email-exists = { path = "../core", features = ["sentry"] } config = "0.14" csv = "1.3.0" dotenv = "0.15.0" -futures = { version = "0.3.30", optional = true } -lapin = { version = "2.3.1", optional = true } -tokio-executor-trait = { version = "2.1.1", optional = true } -tokio-reactor-trait = { version = "1.1.0", optional = true } +futures = { version = "0.3.30" } +lapin = { version = "2.3.1" } +tokio-executor-trait = { version = "2.1.1" } +tokio-reactor-trait = { version = "1.1.0" } openssl = { version = "0.10.64", features = ["vendored"] } -reqwest = { version = "0.12.5", features = ["json", "socks"], optional = true } +reqwest = { version = "0.12.5", features = ["json", "socks"] } sentry = "0.23" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -40,12 +40,3 @@ warp = "0.3" [dev-dependencies] toml = "0.8" - -[features] -worker = [ - "futures", - "lapin", - "tokio-executor-trait", - "tokio-reactor-trait", - "reqwest", -] diff --git a/backend/Dockerfile b/backend/Dockerfile index 14c7fcd56..dc9393a45 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -14,7 +14,7 @@ COPY . . ENV SQLX_OFFLINE=true -RUN cargo build --bin reacher_backend --features worker --release --target=x86_64-unknown-linux-musl +RUN cargo build --bin reacher_backend --release --target=x86_64-unknown-linux-musl # ------------------------------------------------------------------------------ # Final Stage diff --git a/backend/README.md b/backend/README.md index 166d8a91e..b5c21c6ee 100644 --- a/backend/README.md +++ b/backend/README.md @@ -57,7 +57,7 @@ $ git clone https://github.com/reacherhq/check-if-email-exists $ cd check-if-email-exists/backend # Run the backend binary in release mode (slower build, but more performant). -$ cargo run --release --bin reacher_backend --features worker +$ cargo run --release --bin reacher_backend ``` The server will then be listening on `http://127.0.0.1:8080`. diff --git a/backend/src/config.rs b/backend/src/config.rs index 1a4f2dbce..80f67b1a9 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -15,9 +15,7 @@ // along with this program. If not, see . use crate::storage::{postgres::PostgresStorage, Storage}; -#[cfg(feature = "worker")] use crate::worker::do_work::TaskWebhook; -#[cfg(feature = "worker")] use crate::worker::setup_rabbit_mq; use anyhow::{bail, Context}; use check_if_email_exists::{ @@ -25,11 +23,9 @@ use check_if_email_exists::{ YahooVerifMethod, LOG_TARGET, }; use config::Config; -#[cfg(feature = "worker")] use lapin::Channel; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -#[cfg(feature = "worker")] use std::sync::Arc; use std::{any::Any, collections::HashMap}; use tracing::warn; @@ -68,7 +64,6 @@ pub struct BackendConfig { pub storage: HashMap, // Internal fields, not part of the configuration. - #[cfg(feature = "worker")] #[serde(skip)] channel: Option>, @@ -87,19 +82,16 @@ impl BackendConfig { self.worker.enable, &self.worker.throttle, &self.worker.rabbitmq, - #[cfg(feature = "worker")] &self.channel, ) { - #[cfg(feature = "worker")] (true, Some(throttle), Some(rabbitmq), Some(channel)) => Ok(MustWorkerConfig { - #[cfg(feature = "worker")] channel: channel.clone(), throttle: throttle.clone(), rabbitmq: rabbitmq.clone(), - #[cfg(feature = "worker")] + webhook: self.worker.webhook.clone(), }), - #[cfg(feature = "worker")] + (true, _, _, _) => bail!("Worker configuration is missing"), _ => bail!("Calling must_worker_config on a non-worker backend"), } @@ -123,19 +115,16 @@ impl BackendConfig { } } - #[cfg(feature = "worker")] - { - let channel = if self.worker.enable { - let rabbitmq_config = self.worker.rabbitmq.as_ref().ok_or_else(|| { - anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration") - })?; - let channel = setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?; - Some(Arc::new(channel)) - } else { - None - }; - self.channel = channel; - } + let channel = if self.worker.enable { + let rabbitmq_config = self.worker.rabbitmq.as_ref().ok_or_else(|| { + anyhow::anyhow!("Worker configuration is missing the rabbitmq configuration") + })?; + let channel = setup_rabbit_mq(&self.backend_name, rabbitmq_config).await?; + Some(Arc::new(channel)) + } else { + None + }; + self.channel = channel; Ok(()) } @@ -179,7 +168,6 @@ pub struct WorkerConfig { pub throttle: Option, pub rabbitmq: Option, /// Optional webhook configuration to send email verification results. - #[cfg(feature = "worker")] pub webhook: Option, } @@ -187,12 +175,10 @@ pub struct WorkerConfig { /// a domain type to ensure that the worker configuration is present. #[derive(Debug, Clone)] pub struct MustWorkerConfig { - #[cfg(feature = "worker")] pub channel: Arc, pub throttle: ThrottleConfig, pub rabbitmq: RabbitMQConfig, - #[cfg(feature = "worker")] pub webhook: Option, } diff --git a/backend/src/http/error.rs b/backend/src/http/error.rs index ac4c01278..0428994f5 100644 --- a/backend/src/http/error.rs +++ b/backend/src/http/error.rs @@ -77,7 +77,6 @@ impl From for ReacherResponseError { } } -#[cfg(feature = "worker")] impl From for ReacherResponseError { fn from(e: lapin::Error) -> Self { ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e) diff --git a/backend/src/http/mod.rs b/backend/src/http/mod.rs index fdd6a20fd..e94f62f2a 100644 --- a/backend/src/http/mod.rs +++ b/backend/src/http/mod.rs @@ -16,7 +16,6 @@ mod error; mod v0; -#[cfg(feature = "worker")] mod v1; mod version; @@ -38,7 +37,8 @@ pub fn create_routes( config: Arc, ) -> impl Filter + Clone { let pg_pool = config.get_pg_pool(); - let t = version::get::get_version() + + version::get::get_version() .or(v0::check_email::post::post_check_email(Arc::clone(&config))) // The 3 following routes will 404 if o is None. .or(v0::bulk::post::create_bulk_job( @@ -46,23 +46,14 @@ pub fn create_routes( pg_pool.clone(), )) .or(v0::bulk::get::get_bulk_job_status(pg_pool.clone())) - .or(v0::bulk::results::get_bulk_job_result(pg_pool)); - - #[cfg(feature = "worker")] - { - t.or(v1::check_email::post::v1_check_email(Arc::clone(&config))) - .or(v1::bulk::post::v1_create_bulk_job(Arc::clone(&config))) - .or(v1::bulk::get_progress::v1_get_bulk_job_progress( - Arc::clone(&config), - )) - .or(v1::bulk::get_results::v1_get_bulk_job_results(config)) - .recover(handle_rejection) - } - - #[cfg(not(feature = "worker"))] - { - t.recover(handle_rejection) - } + .or(v0::bulk::results::get_bulk_job_result(pg_pool)) + .or(v1::check_email::post::v1_check_email(Arc::clone(&config))) + .or(v1::bulk::post::v1_create_bulk_job(Arc::clone(&config))) + .or(v1::bulk::get_progress::v1_get_bulk_job_progress( + Arc::clone(&config), + )) + .or(v1::bulk::get_results::v1_get_bulk_job_results(config)) + .recover(handle_rejection) } /// Runs the Warp server. diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 5b5afad63..71c140ccf 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -17,7 +17,6 @@ pub mod config; pub mod http; mod storage; -#[cfg(feature = "worker")] pub mod worker; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/backend/src/main.rs b/backend/src/main.rs index 2a262b82b..e815cd62b 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -20,7 +20,6 @@ use check_if_email_exists::{setup_sentry, LOG_TARGET}; use reacher_backend::config::load_config; use reacher_backend::http::run_warp_server; -#[cfg(feature = "worker")] use reacher_backend::worker::run_worker; use std::sync::Arc; use tracing::{debug, info}; @@ -45,25 +44,17 @@ async fn main() -> Result<(), anyhow::Error> { let config = Arc::new(config); - #[cfg(feature = "worker")] - { - let server_future = run_warp_server(Arc::clone(&config)); - let worker_future = async { - if config.worker.enable { - run_worker(config).await?; - } - Ok(()) - }; + let server_future = run_warp_server(Arc::clone(&config)); + let worker_future = async { + if config.worker.enable { + run_worker(config).await?; + } + Ok(()) + }; - tokio::try_join!(server_future, worker_future)?; + tokio::try_join!(server_future, worker_future)?; - info!("Shutting down..."); - } - - #[cfg(not(feature = "worker"))] - { - run_warp_server(config).await?; - } + info!("Shutting down..."); Ok(()) } From 4d84f320cbf8a5b7c87c4deee33f8990d1d70203 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 22:53:10 +0100 Subject: [PATCH 07/12] Add commercial license trial --- backend/src/config.rs | 14 ++ .../src/storage/commercial_license_trial.rs | 144 ++++++++++++++++++ backend/src/storage/mod.rs | 1 + core/src/util/sentry.rs | 2 +- 4 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 backend/src/storage/commercial_license_trial.rs diff --git a/backend/src/config.rs b/backend/src/config.rs index 80f67b1a9..bd8066ca4 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use crate::storage::commercial_license_trial::CommercialLicenseTrialStorage; use crate::storage::{postgres::PostgresStorage, Storage}; use crate::worker::do_work::TaskWebhook; use crate::worker::setup_rabbit_mq; @@ -112,6 +113,15 @@ impl BackendConfig { .with_context(|| format!("Connecting to postgres DB {}", config.db_url))?; self.storages.push(Arc::new(storage)); } + StorageConfig::CommercialLicenseTrial(config) => { + let storage = + CommercialLicenseTrialStorage::new(&config.db_url, config.extra.clone()) + .await + .with_context(|| { + format!("Connecting to postgres DB {}", config.db_url) + })?; + self.storages.push(Arc::new(storage)); + } } } @@ -214,6 +224,10 @@ impl ThrottleConfig { pub enum StorageConfig { /// Store the email verification results in the Postgres database. Postgres(PostgresConfig), + /// Store the email verification results in Reacher's DB. This storage + /// method is baked-in into the software for users of the Commercial + /// License trial. + CommercialLicenseTrial(PostgresConfig), } #[derive(Debug, Deserialize, Clone, PartialEq, Serialize)] diff --git a/backend/src/storage/commercial_license_trial.rs b/backend/src/storage/commercial_license_trial.rs new file mode 100644 index 000000000..71c3f611d --- /dev/null +++ b/backend/src/storage/commercial_license_trial.rs @@ -0,0 +1,144 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use super::error::StorageError; +use super::postgres::PostgresStorage; +use super::Storage; +use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; +use async_trait::async_trait; +use check_if_email_exists::{redact, CheckEmailOutput, LOG_TARGET}; +use serde_json::Value; +use tracing::debug; + +/// Storage that's baked in the software for users of the Commercial License +/// trial. It's really just a wrapper around the PostgresStorage, where we +/// redact all sensitive data such as the email address. +#[derive(Debug)] +pub struct CommercialLicenseTrialStorage { + postgres_storage: PostgresStorage, +} + +impl CommercialLicenseTrialStorage { + pub async fn new(db_url: &str, extra: Option) -> Result { + let postgres_storage = PostgresStorage::new(db_url, extra).await?; + Ok(Self { postgres_storage }) + } +} + +#[async_trait] +impl Storage for CommercialLicenseTrialStorage { + async fn store( + &self, + task: &CheckEmailTask, + worker_output: &Result, + extra: Option, + ) -> Result<(), StorageError> { + let mut payload_json = serde_json::to_value(task)?; + if let Ok(output) = worker_output { + redact_across_json(&mut payload_json, &output.syntax.username); + } + + match worker_output { + Ok(output) => { + let mut output_json = serde_json::to_value(output)?; + redact_across_json(&mut output_json, &output.syntax.username); + + sqlx::query!( + r#" + INSERT INTO v1_task_result (payload, job_id, extra, result) + VALUES ($1, $2, $3, $4) + RETURNING id + "#, + payload_json, + match task.job_id { + CheckEmailJobId::Bulk(job_id) => Some(job_id), + CheckEmailJobId::SingleShot => None, + }, + extra, + output_json, + ) + .fetch_one(&self.postgres_storage.pg_pool) + .await?; + } + Err(err) => { + sqlx::query!( + r#" + INSERT INTO v1_task_result (payload, job_id, extra, error) + VALUES ($1, $2, $3, $4) + RETURNING id + "#, + payload_json, + match task.job_id { + CheckEmailJobId::Bulk(job_id) => Some(job_id), + CheckEmailJobId::SingleShot => None, + }, + extra, + err.to_string(), + ) + .fetch_one(&self.postgres_storage.pg_pool) + .await?; + } + } + + debug!(target: LOG_TARGET, email=?task.input.to_email, "Wrote to DB"); + + Ok(()) + } + + fn get_extra(&self) -> Option { + self.postgres_storage.get_extra() + } +} + +/// Redact all sensitive data by recursively traversing the JSON object. +fn redact_across_json(value: &mut Value, username: &str) { + match value { + Value::String(s) => *s = redact(s, username), + Value::Array(arr) => { + for item in arr { + redact_across_json(item, username); + } + } + Value::Object(obj) => { + for (_, v) in obj { + redact_across_json(v, username); + } + } + _ => {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + use check_if_email_exists::{check_email, CheckEmailInputBuilder}; + + #[tokio::test] + async fn should_redact_across_json() { + let input = CheckEmailInputBuilder::default() + // Checking this email will make a MX record check, but hopefully + // it won't resolve (since I typed it randomly), meaning that the + // SMTP check will be skipped. + .to_email("someone@adlkfjaklsdjfldksjfderlqkjeqwr.com".into()) + .build() + .unwrap(); + let output = check_email(&input).await; + let mut output_json = serde_json::to_value(&output).unwrap(); + redact_across_json(&mut output_json, &output.syntax.username); + + assert!(!output_json.to_string().contains("someone")); + } +} diff --git a/backend/src/storage/mod.rs b/backend/src/storage/mod.rs index 7d6249f05..fcd1afcea 100644 --- a/backend/src/storage/mod.rs +++ b/backend/src/storage/mod.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +pub mod commercial_license_trial; pub mod error; pub mod postgres; diff --git a/core/src/util/sentry.rs b/core/src/util/sentry.rs index 2314e5f6d..427a99b1e 100644 --- a/core/src/util/sentry.rs +++ b/core/src/util/sentry.rs @@ -92,7 +92,7 @@ fn error(err: SentryError, result: &CheckEmailOutput, backend_name: &str) { /// Function to replace all usernames from email, and replace them with /// `***@domain.com` for privacy reasons. -fn redact(input: &str, username: &str) -> String { +pub fn redact(input: &str, username: &str) -> String { input.replace(username, "***") } From 7e0b0a7710f3a5d1f7db9ce3c92c2275ed2e7dd3 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Sat, 7 Dec 2024 23:53:37 +0100 Subject: [PATCH 08/12] Fix storage --- backend/backend_config.toml | 2 +- backend/src/config.rs | 15 ++++++---- backend/src/http/error.rs | 8 +++++- backend/src/http/mod.rs | 2 +- backend/src/http/v1/check_email/post.rs | 28 ++++++++++++++++--- .../src/storage/commercial_license_trial.rs | 7 +++++ backend/src/storage/mod.rs | 4 +++ backend/src/storage/postgres.rs | 7 +++++ backend/src/worker/consume.rs | 2 +- backend/src/worker/do_work.rs | 2 +- backend/src/worker/mod.rs | 2 +- .../worker/{response.rs => single_shot.rs} | 0 12 files changed, 64 insertions(+), 15 deletions(-) rename backend/src/worker/{response.rs => single_shot.rs} (100%) diff --git a/backend/backend_config.toml b/backend/backend_config.toml index d0f976ee6..c5b28bb26 100644 --- a/backend/backend_config.toml +++ b/backend/backend_config.toml @@ -151,4 +151,4 @@ concurrency = 5 # initiated the verification request in a multi-tenant system. # # Env variable: RCH__STORAGE__0__POSTGRES__TABLE_NAME -# extra = {"my_custom_key": "my_custom_value"} +# extra = { "my_custom_key" = "my_custom_value" } diff --git a/backend/src/config.rs b/backend/src/config.rs index bd8066ca4..1ad69b4db 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -27,8 +27,8 @@ use config::Config; use lapin::Channel; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use std::collections::HashMap; use std::sync::Arc; -use std::{any::Any, collections::HashMap}; use tracing::warn; #[derive(Debug, Default, Serialize, Deserialize)] @@ -150,11 +150,16 @@ impl BackendConfig { /// /// This is quite hacky, and it will most probably be refactored away in /// future versions. We however need to rethink how to do the `/v1/bulk` - /// endpoints first. + /// endpoints first. Simply using downcasting should be a warning sign that + /// we're doing something wrong. + /// + /// ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 pub fn get_pg_pool(&self) -> Option { - self.storages - .iter() - .find_map(|s| ::downcast_ref::(s).map(|s| s.pg_pool.clone())) + self.storages.iter().find_map(|s| { + s.as_any() + .downcast_ref::() + .map(|s| s.pg_pool.clone()) + }) } } diff --git a/backend/src/http/error.rs b/backend/src/http/error.rs index 0428994f5..af4c095f2 100644 --- a/backend/src/http/error.rs +++ b/backend/src/http/error.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use crate::storage::error::StorageError; use check_if_email_exists::{CheckEmailInputBuilderError, LOG_TARGET}; use serde::ser::SerializeStruct; use serde::Serialize; @@ -24,7 +25,6 @@ use warp::{http::StatusCode, reject}; /// Trait combining Display and Debug. pub trait DisplayDebug: fmt::Display + Debug + Sync + Send {} - impl DisplayDebug for T {} /// Struct describing an error response. @@ -113,6 +113,12 @@ impl From for ReacherResponseError { } } +impl From for ReacherResponseError { + fn from(e: StorageError) -> Self { + ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e) + } +} + /// This function receives a `Rejection` and tries to return a custom value, /// otherwise simply passes the rejection along. pub async fn handle_rejection(err: warp::Rejection) -> Result { diff --git a/backend/src/http/mod.rs b/backend/src/http/mod.rs index e94f62f2a..be5fe373b 100644 --- a/backend/src/http/mod.rs +++ b/backend/src/http/mod.rs @@ -112,7 +112,7 @@ pub fn with_db( pool.ok_or_else(|| { warp::reject::custom(ReacherResponseError::new( StatusCode::SERVICE_UNAVAILABLE, - "Please configure a database on Reacher before calling this endpoint", + "Please configure a Postgres database on Reacher before calling this endpoint", )) }) } diff --git a/backend/src/http/v1/check_email/post.rs b/backend/src/http/v1/check_email/post.rs index 97c383ea8..ddd595d0b 100644 --- a/backend/src/http/v1/check_email/post.rs +++ b/backend/src/http/v1/check_email/post.rs @@ -17,7 +17,7 @@ //! This file implements the `POST /v1/check_email` endpoint. use check_if_email_exists::{check_email, LOG_TARGET}; -use futures::StreamExt; +use futures::{StreamExt, TryFutureExt}; use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicRejectOptions, QueueDeclareOptions, }; @@ -33,7 +33,7 @@ use crate::http::v1::bulk::post::publish_task; use crate::http::{check_header, ReacherResponseError}; use crate::worker::consume::MAX_QUEUE_PRIORITY; use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask}; -use crate::worker::response::SingleShotReply; +use crate::worker::single_shot::SingleShotReply; /// The main endpoint handler that implements the logic of this route. async fn http_handler( @@ -51,8 +51,28 @@ async fn http_handler( // If worker mode is disabled, we do a direct check, and skip rabbitmq. if !config.worker.enable { - let result = check_email(&body.to_check_email_input(Arc::clone(&config))).await; - let result_bz = serde_json::to_vec(&result).map_err(ReacherResponseError::from)?; + let input = body.to_check_email_input(Arc::clone(&config)); + let result = check_email(&input).await; + let value = Ok(result); + + // Also store the result "manually", since we don't have a worker. + for storage in config.get_storages() { + storage + .store( + &CheckEmailTask { + input: input.clone(), + job_id: CheckEmailJobId::SingleShot, + webhook: None, + }, + &value, + storage.get_extra(), + ) + .map_err(ReacherResponseError::from) + .await?; + } + + let result_bz = serde_json::to_vec(&value).map_err(ReacherResponseError::from)?; + return Ok(warp::reply::with_header( result_bz, "Content-Type", diff --git a/backend/src/storage/commercial_license_trial.rs b/backend/src/storage/commercial_license_trial.rs index 71c3f611d..54a38d79e 100644 --- a/backend/src/storage/commercial_license_trial.rs +++ b/backend/src/storage/commercial_license_trial.rs @@ -21,6 +21,7 @@ use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; use async_trait::async_trait; use check_if_email_exists::{redact, CheckEmailOutput, LOG_TARGET}; use serde_json::Value; +use std::any::Any; use tracing::debug; /// Storage that's baked in the software for users of the Commercial License @@ -101,6 +102,12 @@ impl Storage for CommercialLicenseTrialStorage { fn get_extra(&self) -> Option { self.postgres_storage.get_extra() } + + // This is a workaround to allow downcasting to Any, and should be removed + // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 + fn as_any(&self) -> &dyn Any { + self + } } /// Redact all sensitive data by recursively traversing the JSON object. diff --git a/backend/src/storage/mod.rs b/backend/src/storage/mod.rs index fcd1afcea..1daa514c5 100644 --- a/backend/src/storage/mod.rs +++ b/backend/src/storage/mod.rs @@ -35,4 +35,8 @@ pub trait Storage: Debug + Send + Sync + Any { ) -> Result<(), StorageError>; fn get_extra(&self) -> Option; + + // This is a workaround to allow downcasting to Any, and should be removed + // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 + fn as_any(&self) -> &dyn Any; } diff --git a/backend/src/storage/postgres.rs b/backend/src/storage/postgres.rs index 618c23776..ff0523224 100644 --- a/backend/src/storage/postgres.rs +++ b/backend/src/storage/postgres.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; +use std::any::Any; use tracing::{debug, info}; #[derive(Debug)] @@ -104,4 +105,10 @@ impl Storage for PostgresStorage { fn get_extra(&self) -> Option { self.extra.clone() } + + // This is a workaround to allow downcasting to Any, and should be removed + // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/backend/src/worker/consume.rs b/backend/src/worker/consume.rs index 5dc0d5b78..b46003287 100644 --- a/backend/src/worker/consume.rs +++ b/backend/src/worker/consume.rs @@ -15,7 +15,7 @@ // along with this program. If not, see . use super::do_work::{do_check_email_work, CheckEmailTask, TaskError}; -use super::response::send_single_shot_reply; +use super::single_shot::send_single_shot_reply; use crate::config::{BackendConfig, RabbitMQConfig, ThrottleConfig}; use crate::worker::do_work::CheckEmailJobId; use anyhow::Context; diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index 4c530b620..6d7e7dec1 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -15,7 +15,7 @@ // along with this program. If not, see . use crate::config::BackendConfig; -use crate::worker::response::send_single_shot_reply; +use crate::worker::single_shot::send_single_shot_reply; use check_if_email_exists::{ check_email, CheckEmailInput, CheckEmailOutput, Reachable, LOG_TARGET, }; diff --git a/backend/src/worker/mod.rs b/backend/src/worker/mod.rs index 90b474c33..bffe565a6 100644 --- a/backend/src/worker/mod.rs +++ b/backend/src/worker/mod.rs @@ -22,6 +22,6 @@ pub mod consume; pub mod do_work; -pub mod response; +pub mod single_shot; pub use consume::{run_worker, setup_rabbit_mq}; diff --git a/backend/src/worker/response.rs b/backend/src/worker/single_shot.rs similarity index 100% rename from backend/src/worker/response.rs rename to backend/src/worker/single_shot.rs From f77a83272a5cde113b02e5f83c0134cb23bb069a Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Mon, 9 Dec 2024 22:53:31 +0100 Subject: [PATCH 09/12] Make it pass --- Cargo.lock | 1 - backend/Cargo.toml | 1 - backend/backend_config.toml | 13 +- backend/src/config.rs | 87 ++++------ backend/src/http/v1/check_email/post.rs | 27 ++-- .../src/storage/commercial_license_trial.rs | 151 ------------------ backend/src/storage/mod.rs | 36 +++-- backend/src/storage/postgres.rs | 15 +- backend/src/worker/do_work.rs | 9 +- 9 files changed, 73 insertions(+), 267 deletions(-) delete mode 100644 backend/src/storage/commercial_license_trial.rs diff --git a/Cargo.lock b/Cargo.lock index f16fee02d..7057289dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2958,7 +2958,6 @@ version = "0.10.0" dependencies = [ "anyhow", "async-smtp", - "async-trait", "check-if-email-exists", "config", "csv", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index dd63682b4..8e139bd14 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -8,7 +8,6 @@ publish = false [dependencies] anyhow = "1.0" async-smtp = { version = "0.9.1", features = ["runtime-tokio"] } -async-trait = "0.1" check-if-email-exists = { path = "../core", features = ["sentry"] } config = "0.14" csv = "1.3.0" diff --git a/backend/backend_config.toml b/backend/backend_config.toml index c5b28bb26..6a8978323 100644 --- a/backend/backend_config.toml +++ b/backend/backend_config.toml @@ -128,17 +128,8 @@ concurrency = 5 # results. We currently support the following storage backends: # - Postgres # -# You can configure one or more storages. The format of the configuration is: -# [storage..] -# where: -# - is a unique identifier for the storage, in case you have -# multiple storages of the same type. You can use 0, 1, 2, etc. as the -# identifier. -# - is the type of the storage, e.g. "postgres". -[storage] - -# Uncomment the following line to configure the 1st storage to use Postgres. -# [storage.0.postgres] +# Uncomment the following line to configure the storage to use Postgres. +# [storage.postgres] # # URL to connect to the Postgres database. # diff --git a/backend/src/config.rs b/backend/src/config.rs index 1ad69b4db..ac75539ac 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -14,8 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use crate::storage::commercial_license_trial::CommercialLicenseTrialStorage; -use crate::storage::{postgres::PostgresStorage, Storage}; +use crate::storage::{postgres::PostgresStorage, StorageAdapter}; use crate::worker::do_work::TaskWebhook; use crate::worker::setup_rabbit_mq; use anyhow::{bail, Context}; @@ -27,7 +26,6 @@ use config::Config; use lapin::Channel; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use std::collections::HashMap; use std::sync::Arc; use tracing::warn; @@ -62,14 +60,14 @@ pub struct BackendConfig { pub worker: WorkerConfig, /// Configuration on where to store the email verification results. - pub storage: HashMap, + pub storage: StorageConfig, // Internal fields, not part of the configuration. #[serde(skip)] channel: Option>, #[serde(skip)] - storages: Vec>, + storage_adapter: Arc, } impl BackendConfig { @@ -101,28 +99,15 @@ impl BackendConfig { /// Attempt connection to the Postgres database and RabbitMQ. Also populates /// the internal `pg_pool` and `channel` fields with the connections. pub async fn connect(&mut self) -> Result<(), anyhow::Error> { - if self.worker.enable && self.storage.is_empty() { - bail!("When worker.enable is true, you must configure at least one storage to store the email verification results."); - } + match &self.storage { + StorageConfig::Postgres(config) => { + let storage = PostgresStorage::new(&config.db_url, config.extra.clone()) + .await + .with_context(|| format!("Connecting to postgres DB {}", config.db_url))?; - for storage in self.storage.values() { - match storage { - StorageConfig::Postgres(config) => { - let storage = PostgresStorage::new(&config.db_url, config.extra.clone()) - .await - .with_context(|| format!("Connecting to postgres DB {}", config.db_url))?; - self.storages.push(Arc::new(storage)); - } - StorageConfig::CommercialLicenseTrial(config) => { - let storage = - CommercialLicenseTrialStorage::new(&config.db_url, config.extra.clone()) - .await - .with_context(|| { - format!("Connecting to postgres DB {}", config.db_url) - })?; - self.storages.push(Arc::new(storage)); - } + self.storage_adapter = Arc::new(StorageAdapter::Postgres(storage)); } + StorageConfig::Noop => {} } let channel = if self.worker.enable { @@ -141,25 +126,16 @@ impl BackendConfig { /// Get all storages as a Vec. We don't really care about the keys in the /// HashMap, except for deserialize purposes. - pub fn get_storages(&self) -> Vec> { - self.storages.clone() + pub fn get_storage_adapter(&self) -> Arc { + self.storage_adapter.clone() } - /// Get the Postgres connection pool, if at least one of the storages is a - /// Postgres storage. - /// - /// This is quite hacky, and it will most probably be refactored away in - /// future versions. We however need to rethink how to do the `/v1/bulk` - /// endpoints first. Simply using downcasting should be a warning sign that - /// we're doing something wrong. - /// - /// ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 + /// Get the Postgres connection pool, if the storage is Postgres. pub fn get_pg_pool(&self) -> Option { - self.storages.iter().find_map(|s| { - s.as_any() - .downcast_ref::() - .map(|s| s.pg_pool.clone()) - }) + match self.storage_adapter.as_ref() { + StorageAdapter::Postgres(storage) => Some(storage.pg_pool.clone()), + StorageAdapter::Noop => None, + } } } @@ -224,15 +200,14 @@ impl ThrottleConfig { } } -#[derive(Debug, Deserialize, Clone, PartialEq, Serialize)] +#[derive(Debug, Default, Deserialize, Clone, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] pub enum StorageConfig { /// Store the email verification results in the Postgres database. Postgres(PostgresConfig), - /// Store the email verification results in Reacher's DB. This storage - /// method is baked-in into the software for users of the Commercial - /// License trial. - CommercialLicenseTrial(PostgresConfig), + /// Don't store the email verification results. + #[default] + Noop, } #[derive(Debug, Deserialize, Clone, PartialEq, Serialize)] @@ -265,12 +240,12 @@ mod tests { #[tokio::test] async fn test_env_vars() { env::set_var("RCH__BACKEND_NAME", "test-backend"); - env::set_var("RCH__STORAGE__1__POSTGRES__DB_URL", "test2"); + env::set_var("RCH__STORAGE__POSTGRES__DB_URL", "test2"); let cfg = load_config().await.unwrap(); assert_eq!(cfg.backend_name, "test-backend"); assert_eq!( - cfg.storage.get("1").unwrap(), - &StorageConfig::Postgres(PostgresConfig { + cfg.storage, + StorageConfig::Postgres(PostgresConfig { db_url: "test2".to_string(), extra: None, }) @@ -279,16 +254,12 @@ mod tests { #[tokio::test] async fn test_serialize_storage_config() { - let mut storage_config = HashMap::new(); - storage_config.insert( - "test1", - StorageConfig::Postgres(PostgresConfig { - db_url: "postgres://localhost:5432/test1".to_string(), - extra: None, - }), - ); + let storage_config = StorageConfig::Postgres(PostgresConfig { + db_url: "postgres://localhost:5432/test1".to_string(), + extra: None, + }); - let expected = r#"[test1.postgres] + let expected = r#"[postgres] db_url = "postgres://localhost:5432/test1" "#; diff --git a/backend/src/http/v1/check_email/post.rs b/backend/src/http/v1/check_email/post.rs index ddd595d0b..dbe186233 100644 --- a/backend/src/http/v1/check_email/post.rs +++ b/backend/src/http/v1/check_email/post.rs @@ -56,20 +56,19 @@ async fn http_handler( let value = Ok(result); // Also store the result "manually", since we don't have a worker. - for storage in config.get_storages() { - storage - .store( - &CheckEmailTask { - input: input.clone(), - job_id: CheckEmailJobId::SingleShot, - webhook: None, - }, - &value, - storage.get_extra(), - ) - .map_err(ReacherResponseError::from) - .await?; - } + let storage = config.get_storage_adapter(); + storage + .store( + &CheckEmailTask { + input: input.clone(), + job_id: CheckEmailJobId::SingleShot, + webhook: None, + }, + &value, + storage.get_extra(), + ) + .map_err(ReacherResponseError::from) + .await?; let result_bz = serde_json::to_vec(&value).map_err(ReacherResponseError::from)?; diff --git a/backend/src/storage/commercial_license_trial.rs b/backend/src/storage/commercial_license_trial.rs deleted file mode 100644 index 54a38d79e..000000000 --- a/backend/src/storage/commercial_license_trial.rs +++ /dev/null @@ -1,151 +0,0 @@ -// Reacher - Email Verification -// Copyright (C) 2018-2023 Reacher - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use super::error::StorageError; -use super::postgres::PostgresStorage; -use super::Storage; -use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; -use async_trait::async_trait; -use check_if_email_exists::{redact, CheckEmailOutput, LOG_TARGET}; -use serde_json::Value; -use std::any::Any; -use tracing::debug; - -/// Storage that's baked in the software for users of the Commercial License -/// trial. It's really just a wrapper around the PostgresStorage, where we -/// redact all sensitive data such as the email address. -#[derive(Debug)] -pub struct CommercialLicenseTrialStorage { - postgres_storage: PostgresStorage, -} - -impl CommercialLicenseTrialStorage { - pub async fn new(db_url: &str, extra: Option) -> Result { - let postgres_storage = PostgresStorage::new(db_url, extra).await?; - Ok(Self { postgres_storage }) - } -} - -#[async_trait] -impl Storage for CommercialLicenseTrialStorage { - async fn store( - &self, - task: &CheckEmailTask, - worker_output: &Result, - extra: Option, - ) -> Result<(), StorageError> { - let mut payload_json = serde_json::to_value(task)?; - if let Ok(output) = worker_output { - redact_across_json(&mut payload_json, &output.syntax.username); - } - - match worker_output { - Ok(output) => { - let mut output_json = serde_json::to_value(output)?; - redact_across_json(&mut output_json, &output.syntax.username); - - sqlx::query!( - r#" - INSERT INTO v1_task_result (payload, job_id, extra, result) - VALUES ($1, $2, $3, $4) - RETURNING id - "#, - payload_json, - match task.job_id { - CheckEmailJobId::Bulk(job_id) => Some(job_id), - CheckEmailJobId::SingleShot => None, - }, - extra, - output_json, - ) - .fetch_one(&self.postgres_storage.pg_pool) - .await?; - } - Err(err) => { - sqlx::query!( - r#" - INSERT INTO v1_task_result (payload, job_id, extra, error) - VALUES ($1, $2, $3, $4) - RETURNING id - "#, - payload_json, - match task.job_id { - CheckEmailJobId::Bulk(job_id) => Some(job_id), - CheckEmailJobId::SingleShot => None, - }, - extra, - err.to_string(), - ) - .fetch_one(&self.postgres_storage.pg_pool) - .await?; - } - } - - debug!(target: LOG_TARGET, email=?task.input.to_email, "Wrote to DB"); - - Ok(()) - } - - fn get_extra(&self) -> Option { - self.postgres_storage.get_extra() - } - - // This is a workaround to allow downcasting to Any, and should be removed - // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 - fn as_any(&self) -> &dyn Any { - self - } -} - -/// Redact all sensitive data by recursively traversing the JSON object. -fn redact_across_json(value: &mut Value, username: &str) { - match value { - Value::String(s) => *s = redact(s, username), - Value::Array(arr) => { - for item in arr { - redact_across_json(item, username); - } - } - Value::Object(obj) => { - for (_, v) in obj { - redact_across_json(v, username); - } - } - _ => {} - } -} - -#[cfg(test)] -mod tests { - use super::*; - use check_if_email_exists::{check_email, CheckEmailInputBuilder}; - - #[tokio::test] - async fn should_redact_across_json() { - let input = CheckEmailInputBuilder::default() - // Checking this email will make a MX record check, but hopefully - // it won't resolve (since I typed it randomly), meaning that the - // SMTP check will be skipped. - .to_email("someone@adlkfjaklsdjfldksjfderlqkjeqwr.com".into()) - .build() - .unwrap(); - let output = check_email(&input).await; - let mut output_json = serde_json::to_value(&output).unwrap(); - redact_across_json(&mut output_json, &output.syntax.username); - - assert!(!output_json.to_string().contains("someone")); - } -} diff --git a/backend/src/storage/mod.rs b/backend/src/storage/mod.rs index 1daa514c5..a3f0d9fbc 100644 --- a/backend/src/storage/mod.rs +++ b/backend/src/storage/mod.rs @@ -14,29 +14,39 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -pub mod commercial_license_trial; pub mod error; pub mod postgres; use crate::worker::do_work::{CheckEmailTask, TaskError}; -use async_trait::async_trait; use check_if_email_exists::CheckEmailOutput; use error::StorageError; -use std::any::Any; +use postgres::PostgresStorage; use std::fmt::Debug; -#[async_trait] -pub trait Storage: Debug + Send + Sync + Any { - async fn store( +#[derive(Debug, Default)] +pub enum StorageAdapter { + Postgres(PostgresStorage), + #[default] + Noop, +} + +impl StorageAdapter { + pub async fn store( &self, task: &CheckEmailTask, worker_output: &Result, extra: Option, - ) -> Result<(), StorageError>; - - fn get_extra(&self) -> Option; - - // This is a workaround to allow downcasting to Any, and should be removed - // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 - fn as_any(&self) -> &dyn Any; + ) -> Result<(), StorageError> { + match self { + StorageAdapter::Postgres(storage) => storage.store(task, worker_output, extra).await, + StorageAdapter::Noop => Ok(()), + } + } + + pub fn get_extra(&self) -> Option { + match self { + StorageAdapter::Postgres(storage) => storage.get_extra().clone(), + StorageAdapter::Noop => None, + } + } } diff --git a/backend/src/storage/postgres.rs b/backend/src/storage/postgres.rs index ff0523224..00b31a57d 100644 --- a/backend/src/storage/postgres.rs +++ b/backend/src/storage/postgres.rs @@ -15,9 +15,7 @@ // along with this program. If not, see . use super::error::StorageError; -use super::Storage; use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; -use async_trait::async_trait; use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -44,11 +42,8 @@ impl PostgresStorage { Ok(Self { pg_pool, extra }) } -} -#[async_trait] -impl Storage for PostgresStorage { - async fn store( + pub async fn store( &self, task: &CheckEmailTask, worker_output: &Result, @@ -102,13 +97,7 @@ impl Storage for PostgresStorage { Ok(()) } - fn get_extra(&self) -> Option { + pub fn get_extra(&self) -> Option { self.extra.clone() } - - // This is a workaround to allow downcasting to Any, and should be removed - // ref: https://github.com/reacherhq/check-if-email-exists/issues/1544 - fn as_any(&self) -> &dyn Any { - self - } } diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index 6d7e7dec1..1a4ae7cbb 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -145,11 +145,10 @@ pub(crate) async fn do_check_email_work( } // Store the result. - for storage in config.get_storages() { - storage - .store(task, &worker_output, storage.get_extra()) - .await?; - } + let storage = config.get_storage_adapter(); + storage + .store(task, &worker_output, storage.get_extra()) + .await?; info!(target: LOG_TARGET, email=task.input.to_email, From d6912cde0b02e24dde1e725b5c11e87f3ae34e67 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Mon, 9 Dec 2024 22:55:19 +0100 Subject: [PATCH 10/12] clippy --- backend/src/storage/postgres.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/storage/postgres.rs b/backend/src/storage/postgres.rs index 00b31a57d..401b4e94a 100644 --- a/backend/src/storage/postgres.rs +++ b/backend/src/storage/postgres.rs @@ -19,7 +19,6 @@ use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask, TaskError}; use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; -use std::any::Any; use tracing::{debug, info}; #[derive(Debug)] From 4bc8df1b3fabe78fac4e5f75fafa65a0aa53d21e Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Thu, 12 Dec 2024 00:03:19 +0100 Subject: [PATCH 11/12] Add commercial license --- Cargo.lock | 12 ++++++ backend/Cargo.toml | 1 + backend/src/config.rs | 21 ++++++--- backend/src/http/error.rs | 6 +++ backend/src/http/v1/check_email/post.rs | 7 +++ .../src/storage/commercial_license_trial.rs | 43 +++++++++++++++++++ backend/src/storage/mod.rs | 1 + backend/src/worker/consume.rs | 2 + backend/src/worker/do_work.rs | 5 +++ core/src/lib.rs | 8 +++- 10 files changed, 99 insertions(+), 7 deletions(-) create mode 100644 backend/src/storage/commercial_license_trial.rs diff --git a/Cargo.lock b/Cargo.lock index 7057289dd..dc6cffc0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2967,6 +2967,7 @@ dependencies = [ "openssl", "reqwest 0.12.5", "sentry", + "sentry-anyhow", "serde", "serde_json", "sqlx", @@ -3409,6 +3410,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "sentry-anyhow" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338ef04f73ca2fb1130ebab3853dca36041aa219a442ae873627373887660c36" +dependencies = [ + "anyhow", + "sentry-backtrace", + "sentry-core", +] + [[package]] name = "sentry-backtrace" version = "0.23.0" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 8e139bd14..b150c75fa 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -19,6 +19,7 @@ tokio-reactor-trait = { version = "1.1.0" } openssl = { version = "0.10.64", features = ["vendored"] } reqwest = { version = "0.12.5", features = ["json", "socks"] } sentry = "0.23" +sentry-anyhow = "0.23" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sqlx = { version = "0.7", features = [ diff --git a/backend/src/config.rs b/backend/src/config.rs index ac75539ac..65abb133d 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -60,7 +60,10 @@ pub struct BackendConfig { pub worker: WorkerConfig, /// Configuration on where to store the email verification results. - pub storage: StorageConfig, + pub storage: Option, + + /// Whether to enable the Commercial License Trial. Setting this to true + pub commercial_license_trial: Option, // Internal fields, not part of the configuration. #[serde(skip)] @@ -100,14 +103,16 @@ impl BackendConfig { /// the internal `pg_pool` and `channel` fields with the connections. pub async fn connect(&mut self) -> Result<(), anyhow::Error> { match &self.storage { - StorageConfig::Postgres(config) => { + Some(StorageConfig::Postgres(config)) => { let storage = PostgresStorage::new(&config.db_url, config.extra.clone()) .await .with_context(|| format!("Connecting to postgres DB {}", config.db_url))?; self.storage_adapter = Arc::new(StorageAdapter::Postgres(storage)); } - StorageConfig::Noop => {} + _ => { + self.storage_adapter = Arc::new(StorageAdapter::Noop); + } } let channel = if self.worker.enable { @@ -216,6 +221,12 @@ pub struct PostgresConfig { pub extra: Option, } +#[derive(Debug, Deserialize, Clone, Serialize)] +pub struct CommercialLicenseTrialConfig { + pub api_token: String, + pub url: String, +} + /// Load the worker configuration from the worker_config.toml file and from the /// environment. pub async fn load_config() -> Result { @@ -245,10 +256,10 @@ mod tests { assert_eq!(cfg.backend_name, "test-backend"); assert_eq!( cfg.storage, - StorageConfig::Postgres(PostgresConfig { + Some(StorageConfig::Postgres(PostgresConfig { db_url: "test2".to_string(), extra: None, - }) + })) ); } diff --git a/backend/src/http/error.rs b/backend/src/http/error.rs index af4c095f2..108e39268 100644 --- a/backend/src/http/error.rs +++ b/backend/src/http/error.rs @@ -119,6 +119,12 @@ impl From for ReacherResponseError { } } +impl From for ReacherResponseError { + fn from(e: reqwest::Error) -> Self { + ReacherResponseError::new(StatusCode::INTERNAL_SERVER_ERROR, e) + } +} + /// This function receives a `Rejection` and tries to return a custom value, /// otherwise simply passes the rejection along. pub async fn handle_rejection(err: warp::Rejection) -> Result { diff --git a/backend/src/http/v1/check_email/post.rs b/backend/src/http/v1/check_email/post.rs index dbe186233..16e2a595d 100644 --- a/backend/src/http/v1/check_email/post.rs +++ b/backend/src/http/v1/check_email/post.rs @@ -31,6 +31,7 @@ use crate::config::BackendConfig; use crate::http::v0::check_email::post::{with_config, CheckEmailRequest}; use crate::http::v1::bulk::post::publish_task; use crate::http::{check_header, ReacherResponseError}; +use crate::storage::commercial_license_trial::send_to_reacher; use crate::worker::consume::MAX_QUEUE_PRIORITY; use crate::worker::do_work::{CheckEmailJobId, CheckEmailTask}; use crate::worker::single_shot::SingleShotReply; @@ -70,6 +71,12 @@ async fn http_handler( .map_err(ReacherResponseError::from) .await?; + // If we're in the Commercial License Trial, we also store the + // result by sending it to back to Reacher. + send_to_reacher(Arc::clone(&config), &input.to_email, &value) + .await + .map_err(ReacherResponseError::from)?; + let result_bz = serde_json::to_vec(&value).map_err(ReacherResponseError::from)?; return Ok(warp::reply::with_header( diff --git a/backend/src/storage/commercial_license_trial.rs b/backend/src/storage/commercial_license_trial.rs new file mode 100644 index 000000000..4c84f5e65 --- /dev/null +++ b/backend/src/storage/commercial_license_trial.rs @@ -0,0 +1,43 @@ +// Reacher - Email Verification +// Copyright (C) 2018-2023 Reacher + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::config::{BackendConfig, CommercialLicenseTrialConfig}; +use crate::worker::do_work::TaskError; +use check_if_email_exists::{CheckEmailOutput, LOG_TARGET}; +use std::sync::Arc; +use tracing::debug; + +/// If we're in the Commercial License Trial, we also store the +/// result by sending it to back to Reacher. +pub async fn send_to_reacher( + config: Arc, + email: &str, + worker_output: &Result, +) -> Result<(), reqwest::Error> { + if let Some(CommercialLicenseTrialConfig { api_token, url }) = &config.commercial_license_trial + { + let res = reqwest::Client::new() + .post(url) + .header("Authorization", api_token) + .json(worker_output) + .send() + .await?; + let res = res.text().await?; + debug!(target: LOG_TARGET, email=email, res=res, "Sent result to Reacher Commercial License Trial"); + } + + Ok(()) +} diff --git a/backend/src/storage/mod.rs b/backend/src/storage/mod.rs index a3f0d9fbc..992500f1a 100644 --- a/backend/src/storage/mod.rs +++ b/backend/src/storage/mod.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +pub mod commercial_license_trial; pub mod error; pub mod postgres; diff --git a/backend/src/worker/consume.rs b/backend/src/worker/consume.rs index b46003287..9104f33c3 100644 --- a/backend/src/worker/consume.rs +++ b/backend/src/worker/consume.rs @@ -22,6 +22,7 @@ use anyhow::Context; use check_if_email_exists::LOG_TARGET; use futures::stream::StreamExt; use lapin::{options::*, types::FieldTable, Channel, Connection, ConnectionProperties}; +use sentry_anyhow::capture_anyhow; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::Mutex; @@ -164,6 +165,7 @@ async fn consume_check_email(config: Arc) -> Result<(), anyhow::E do_check_email_work(&payload, delivery, channel_clone2, config_clone2).await { error!(target: LOG_TARGET, email=payload.input.to_email, error=?e, "Error processing message"); + capture_anyhow(&e); } }); diff --git a/backend/src/worker/do_work.rs b/backend/src/worker/do_work.rs index 1a4ae7cbb..214ca110a 100644 --- a/backend/src/worker/do_work.rs +++ b/backend/src/worker/do_work.rs @@ -15,6 +15,7 @@ // along with this program. If not, see . use crate::config::BackendConfig; +use crate::storage::commercial_license_trial::send_to_reacher; use crate::worker::single_shot::send_single_shot_reply; use check_if_email_exists::{ check_email, CheckEmailInput, CheckEmailOutput, Reachable, LOG_TARGET, @@ -150,6 +151,10 @@ pub(crate) async fn do_check_email_work( .store(task, &worker_output, storage.get_extra()) .await?; + // If we're in the Commercial License Trial, we also store the + // result by sending it to back to Reacher. + send_to_reacher(config, &task.input.to_email, &worker_output).await?; + info!(target: LOG_TARGET, email=task.input.to_email, worker_output=?worker_output.map(|o| o.is_reachable), diff --git a/core/src/lib.rs b/core/src/lib.rs index 23619ff59..d2d6da0b5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -253,7 +253,7 @@ pub async fn check_email(input: &CheckEmailInput) -> CheckEmailOutput { let end_time = SystemTime::now(); - CheckEmailOutput { + let output = CheckEmailOutput { input: to_email.to_string(), is_reachable: calculate_reachable(&my_misc, &my_smtp), misc: Ok(my_misc), @@ -269,5 +269,9 @@ pub async fn check_email(input: &CheckEmailInput) -> CheckEmailOutput { smtp: smtp_debug, backend_name: input.backend_name.clone(), }, - } + }; + + log_unknown_errors(&output, &input.backend_name); + + output } From 05a4c551a9271baf39531225cbe91ad31a5ae142 Mon Sep 17 00:00:00 2001 From: Amaury <1293565+amaury1093@users.noreply.github.com> Date: Thu, 12 Dec 2024 00:07:07 +0100 Subject: [PATCH 12/12] revert --- core/src/util/sentry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/util/sentry.rs b/core/src/util/sentry.rs index 427a99b1e..2314e5f6d 100644 --- a/core/src/util/sentry.rs +++ b/core/src/util/sentry.rs @@ -92,7 +92,7 @@ fn error(err: SentryError, result: &CheckEmailOutput, backend_name: &str) { /// Function to replace all usernames from email, and replace them with /// `***@domain.com` for privacy reasons. -pub fn redact(input: &str, username: &str) -> String { +fn redact(input: &str, username: &str) -> String { input.replace(username, "***") }