From 843477650ef44b68f2d8cfe4b5459fecb24308c1 Mon Sep 17 00:00:00 2001 From: Bread White Date: Tue, 12 Nov 2024 11:46:18 +0300 Subject: [PATCH 1/6] fix(compose): docker compose fixes --- Dockerfile | 8 ++++---- config/production.toml | 3 +++ docker-compose.yaml | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1dbeedc..8f9e078 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,25 +20,25 @@ FROM chef AS builder WORKDIR /app -RUN apt update && apt install -y curl wget - COPY --from=planner /app/recipe.json recipe.json RUN cargo chef cook --release --recipe-path recipe.json COPY . . -RUN echo -e 'building binary with feature "${FEATURES}"' +RUN echo -e 'building binary with feature "$FEATURES"' RUN cargo install ${FEATURES} --bins --path . # Target layer based on tiny official ubuntu image with neccessary binaries and data to run. FROM ubuntu:rolling +RUN apt update && apt install -y curl + WORKDIR /app COPY --from=builder /app/target/release/news-rss . ENTRYPOINT ["/app/news-rss"] -EXPOSE 2892 +EXPOSE 2865 diff --git a/config/production.toml b/config/production.toml index 0dea285..82375a8 100644 --- a/config/production.toml +++ b/config/production.toml @@ -1,6 +1,9 @@ [logger] level = "info" +[server] +address = "0.0.0.0:2865" + [cache.local] expired_secs = 10368000 diff --git a/docker-compose.yaml b/docker-compose.yaml index 969282f..967dda2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -43,7 +43,7 @@ services: - redis - pgsql ports: - - '2895:2895' + - '2865:2865' volumes: - './config:/app/config:ro' environment: From 20a18dae8260fe7f9908e58a02019f033ba325bc Mon Sep 17 00:00:00 2001 From: Bread White Date: Wed, 13 Nov 2024 22:33:21 +0300 Subject: [PATCH 2/6] feat(storage): impled pgsql storage --- Cargo.toml | 4 +- src/storage/config.rs | 2 - src/storage/mod.rs | 7 +++- src/storage/pgsql/mod.rs | 74 +++++++++++++++++++++++++++++++++++++ src/storage/pgsql/models.rs | 11 +++++- 5 files changed, 89 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 50d1633..3346cdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,7 @@ edition = "2021" [features] cache-redis = ["dep:redis"] -publish-offline = ["dep:sqlx"] -storage-pgsql = ["dep:sqlx"] +publish-offline = [] crawler-llm = ["dep:openai_dive", "dep:html2text", "dep:html_editor"] default = [] @@ -68,7 +67,6 @@ version = "^1.0" features = ["derive", "serde_derive"] [dependencies.sqlx] -optional = true version = "^0.7" features = ["postgres", "runtime-tokio", "chrono"] diff --git a/src/storage/config.rs b/src/storage/config.rs index a108cf5..450f0f9 100644 --- a/src/storage/config.rs +++ b/src/storage/config.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "storage-pgsql")] use crate::storage::pgsql::config::PgsqlTopicStorageConfig; use getset::Getters; @@ -7,6 +6,5 @@ use serde::Deserialize; #[derive(Clone, Deserialize, Getters)] #[getset(get = "pub")] pub struct StorageConfig { - #[cfg(feature = "storage-pgsql")] pgsql: PgsqlTopicStorageConfig, } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 0933fdb..e8af53f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,13 +1,16 @@ pub mod config; - -#[cfg(feature = "storage-pgsql")] pub mod pgsql; #[async_trait::async_trait] pub trait LoadTopic { type Error; type Topic; + type TopicId; async fn load_all(&self) -> Result, Self::Error>; async fn load_at_launch(&self) -> Result, Self::Error>; + async fn search_source(&self, query: &str) -> Result, Self::Error>; + async fn add_source(&self, topic: &Self::Topic) -> Result<(), Self::Error>; + async fn remove_source(&self, id: Self::TopicId) -> Result<(), Self::Error>; + async fn update_source(&self, topic: &Self::Topic) -> Result<(), Self::Error>; } diff --git a/src/storage/pgsql/mod.rs b/src/storage/pgsql/mod.rs index df61cd2..791777f 100644 --- a/src/storage/pgsql/mod.rs +++ b/src/storage/pgsql/mod.rs @@ -45,6 +45,7 @@ impl ServiceConnect for PgsqlTopicStorage { impl LoadTopic for PgsqlTopicStorage { type Error = sqlx::Error; type Topic = PgsqlTopicModel; + type TopicId = i32; async fn load_all(&self) -> Result, Self::Error> { let connection = self.pool.as_ref(); @@ -75,4 +76,77 @@ impl LoadTopic for PgsqlTopicStorage { Ok(models) } + + async fn search_source(&self, query: &str) -> Result, Self::Error> { + let connection = self.pool.as_ref(); + let sql_query = format!( + r#" + SELECT * FROM rss_sources + WHERE name LIKE '%{}%' OR link LIKE '%{}%' + "#, + &query, &query, + ); + let models = sqlx::query_as(&sql_query).fetch_all(connection).await?; + + Ok(models) + } + + async fn add_source(&self, topic: &Self::Topic) -> Result<(), Self::Error> { + let connection = self.pool.as_ref(); + let _ = sqlx::query!( + r#" + INSERT INTO rss_sources (name, link, run_at_launch) + VALUES ($1, $2, $3) + "#, + topic.name, + topic.link, + topic.run_at_launch, + ) + .execute(connection) + .await?; + + Ok(()) + } + + async fn remove_source(&self, id: Self::TopicId) -> Result<(), Self::Error> { + let connection = self.pool.as_ref(); + let _ = sqlx::query!( + r#" + DELETE FROM rss_sources + WHERE id = $1 + "#, + id, + ) + .execute(connection) + .await?; + + Ok(()) + } + + async fn update_source(&self, topic: &Self::Topic) -> Result<(), Self::Error> { + let connection = self.pool.as_ref(); + let _ = sqlx::query!( + r#" + UPDATE rss_sources + SET name = $2, + link = $3, + run_at_launch = $4, + max_retries = $5, + timeout = $6, + interval_secs = $7 + WHERE id = $1 + "#, + topic.id, + topic.name, + topic.link, + topic.run_at_launch, + topic.max_retries, + topic.timeout, + topic.interval_secs, + ) + .execute(connection) + .await?; + + Ok(()) + } } diff --git a/src/storage/pgsql/models.rs b/src/storage/pgsql/models.rs index b32673d..39d946e 100644 --- a/src/storage/pgsql/models.rs +++ b/src/storage/pgsql/models.rs @@ -1,9 +1,10 @@ use crate::feeds::rss_feeds::config::RssConfig; -use serde::Deserialize; +use derive_builder::Builder; +use serde::{Deserialize, Serialize}; use sqlx::{Decode, FromRow}; -#[derive(FromRow, Deserialize, Decode)] +#[derive(Builder, FromRow, Deserialize, Serialize, Decode)] pub struct PgsqlTopicModel { pub id: i32, pub name: String, @@ -26,3 +27,9 @@ impl From for RssConfig { .unwrap() } } + +impl PgsqlTopicModel { + pub fn builder() -> PgsqlTopicModelBuilder { + PgsqlTopicModelBuilder::default() + } +} From 8f26338e0137cbc495255efb7ea29baa28662386 Mon Sep 17 00:00:00 2001 From: Bread White Date: Wed, 13 Nov 2024 22:33:45 +0300 Subject: [PATCH 3/6] chore(server): impled routes for pgsql storage --- src/server/forms.rs | 114 +++++++++++++++++ src/server/mod.rs | 35 +++++- src/server/routers.rs | 278 +++++++++++++++++++++++++++++++++++++++--- src/server/swagger.rs | 8 ++ 4 files changed, 411 insertions(+), 24 deletions(-) diff --git a/src/server/forms.rs b/src/server/forms.rs index 329928c..00d6270 100644 --- a/src/server/forms.rs +++ b/src/server/forms.rs @@ -1,5 +1,6 @@ use crate::feeds::rss_feeds::config::RssConfig; use crate::server::swagger::SwaggerExamples; +use crate::storage::pgsql::models::PgsqlTopicModel; use derive_builder::Builder; use getset::{CopyGetters, Getters}; @@ -182,3 +183,116 @@ impl SwaggerExamples for GetInfoResponse { } } } + +#[derive(Builder, Deserialize, Serialize, IntoParams, ToSchema)] +pub struct GetSourcesResponse { + id: i32, + #[schema(example = "BBC")] + name: String, + #[schema(example = "https://bbc-news.com/rss.xml")] + link: String, + #[schema(example = false)] + run_at_launch: bool, + #[schema(example = 3)] + max_retries: i32, + #[schema(example = 100)] + timeout: i32, + #[schema(example = 3600)] + interval_secs: i32, +} + +impl From for GetSourcesResponse { + fn from(value: PgsqlTopicModel) -> Self { + GetSourcesResponseBuilder::default() + .id(value.id) + .name(value.name.to_owned()) + .link(value.link.to_owned()) + .run_at_launch(value.run_at_launch) + .max_retries(value.max_retries) + .timeout(value.timeout) + .interval_secs(value.interval_secs) + .build() + .unwrap() + } +} + +impl SwaggerExamples for GetSourcesResponse { + type Example = Self; + + fn example(_value: Option) -> Self::Example { + GetSourcesResponseBuilder::default() + .id(1) + .name("BBC".to_owned()) + .link("https://bbc-news.com/rss.xml".to_owned()) + .run_at_launch(true) + .max_retries(3) + .timeout(100) + .interval_secs(3600) + .build() + .unwrap() + } +} + +#[derive(Builder, Deserialize, Serialize, IntoParams, ToSchema)] +pub struct CreateSourceForm { + #[schema(example = "BBC")] + name: String, + #[schema(example = "https://bbc-news.com/rss.xml")] + link: String, + #[schema(example = false)] + run_at_launch: bool, + #[schema(example = 3)] + max_retries: i32, + #[schema(example = 100)] + timeout: i32, + #[schema(example = 3600)] + interval_secs: i32, +} + +impl From for PgsqlTopicModel { + fn from(value: CreateSourceForm) -> Self { + PgsqlTopicModel::builder() + .id(0) + .name(value.name.to_owned()) + .link(value.link.to_owned()) + .run_at_launch(value.run_at_launch) + .max_retries(value.max_retries) + .timeout(value.timeout) + .interval_secs(value.interval_secs) + .build() + .unwrap() + } +} + +impl SwaggerExamples for CreateSourceForm { + type Example = Self; + + fn example(_value: Option) -> Self::Example { + CreateSourceFormBuilder::default() + .name("BBC".to_owned()) + .link("https://bbc-news.com/rss.xml".to_owned()) + .run_at_launch(true) + .max_retries(3) + .timeout(100) + .interval_secs(3600) + .build() + .unwrap() + } +} + +#[derive(Getters, Deserialize, Serialize, IntoParams, ToSchema)] +#[getset(get = "pub")] +pub struct SearchSourcesForm { + #[schema(example = "World")] + query: String, +} + +impl SwaggerExamples for SearchSourcesForm { + type Example = Self; + + fn example(_value: Option) -> Self::Example { + SearchSourcesForm { + query: "World".to_string(), + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index ceef3dd..55fb49c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -8,8 +8,10 @@ use crate::cache::CacheService; use crate::crawler::CrawlerService; use crate::feeds::rss_feeds::config::RssConfig; use crate::publish::Publisher; +use crate::storage::pgsql::models::PgsqlTopicModel; +use crate::storage::LoadTopic; -use axum::routing::{delete, get, post, put}; +use axum::routing::{delete, get, patch, post, put}; use axum::Router; use getset::Getters; use std::collections::HashMap; @@ -32,44 +34,62 @@ impl RssWorker { } } -pub struct ServerApp +pub struct ServerApp where P: Publisher, C: CacheService, S: CrawlerService, + R: LoadTopic, { workers: Arc>, publish: Arc

, cache: Arc, crawler: Arc, + storage: Arc, } -impl ServerApp +impl ServerApp where P: Publisher, C: CacheService, S: CrawlerService, + R: LoadTopic, { - pub fn new(workers: JoinableWorkers, publish: Arc

, cache: Arc, crawler: Arc) -> Self { + pub fn new( + workers: JoinableWorkers, + publish: Arc

, + cache: Arc, + crawler: Arc, + storage: Arc, + ) -> Self { let workers_guard = Arc::new(RwLock::new(workers)); ServerApp { workers: workers_guard, publish, cache, crawler, + storage, } } pub fn workers(&self) -> Arc> { self.workers.clone() } + + pub fn storage(&self) -> Arc { + self.storage.clone() + } } -pub fn init_server(app: ServerApp) -> Router +pub fn init_server(app: ServerApp) -> Router where P: Publisher + Sync + Send + 'static, C: CacheService + Sync + Send + 'static, S: CrawlerService + Sync + Send + 'static, + R: LoadTopic + + Sync + + Send + + 'static, { let app_arc = Arc::new(app); Router::new() @@ -80,5 +100,10 @@ where .route("/workers/restart", post(routers::restart_worker)) .route("/workers/delete", delete(routers::delete_worker)) .route("/workers/terminate", post(routers::terminate_worker)) + .route("/sources/all", get(routers::all_sources)) + .route("/sources/add", put(routers::add_source)) + .route("/sources/search", post(routers::search_sources)) + .route("/sources/update", patch(routers::update_source)) + .route("/sources/:source_id", delete(routers::remove_source)) .with_state(app_arc) } diff --git a/src/server/routers.rs b/src/server/routers.rs index 208cadc..ef182e2 100644 --- a/src/server/routers.rs +++ b/src/server/routers.rs @@ -6,20 +6,17 @@ use crate::publish::Publisher; use crate::server::errors::ServerError; use crate::server::errors::ServerResult; use crate::server::errors::Success; -use crate::server::forms::GetInfoForm; -use crate::server::forms::GetInfoResponse; -use crate::server::forms::TerminateWorkerForm; -use crate::server::forms::{CreateWorkerForm, RssConfigForm}; +use crate::server::forms::*; use crate::server::swagger::SwaggerExamples; use crate::server::{RssWorker, ServerApp}; +use crate::storage::pgsql::models::PgsqlTopicModel; +use crate::storage::LoadTopic; -use axum::extract::State; +use axum::extract::{Path, State}; use axum::response::IntoResponse; use axum::Json; use std::sync::Arc; -use super::forms::DeleteWorkerForm; - #[utoipa::path( get, path = "/workers/all", @@ -45,13 +42,14 @@ use super::forms::DeleteWorkerForm; ), ) )] -pub async fn get_workers( - State(state): State>>, +pub async fn get_workers( + State(state): State>>, ) -> ServerResult where P: Publisher + Sync + Send, C: CacheService + Sync + Send, S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, { let workers = state.workers(); let workers_guard = workers.read().await; @@ -106,14 +104,15 @@ where ), ) )] -pub async fn get_worker_info( - State(state): State>>, +pub async fn get_worker_info( + State(state): State>>, Json(form): Json, ) -> ServerResult where P: Publisher + Sync + Send, C: CacheService + Sync + Send, S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, { let workers = state.workers(); let workers_guard = workers.read().await; @@ -168,14 +167,15 @@ where ), ) )] -pub async fn create_worker( - State(state): State>>, +pub async fn create_worker( + State(state): State>>, Json(form): Json, ) -> ServerResult where P: Publisher + Sync + Send + 'static, C: CacheService + Sync + Send + 'static, S: CrawlerService + Sync + Send + 'static, + R: LoadTopic + Sync + Send + 'static, { let workers = state.workers(); let mut workers_guard = workers.write().await; @@ -232,14 +232,15 @@ where ), ) )] -pub async fn restart_worker( - State(state): State>>, +pub async fn restart_worker( + State(state): State>>, Json(form): Json, ) -> ServerResult where P: Publisher + Sync + Send + 'static, C: CacheService + Sync + Send + 'static, S: CrawlerService + Sync + Send + 'static, + R: LoadTopic + Sync + Send + 'static, { let workers = state.workers(); let mut workers_guard = workers.write().await; @@ -296,14 +297,15 @@ where ), ) )] -pub async fn terminate_worker( - State(state): State>>, +pub async fn terminate_worker( + State(state): State>>, Json(form): Json, ) -> ServerResult where P: Publisher + Sync + Send, C: CacheService + Sync + Send, S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, { let workers = state.workers(); let workers_guard = workers.read().await; @@ -348,14 +350,15 @@ where ), ) )] -pub async fn delete_worker( - State(state): State>>, +pub async fn delete_worker( + State(state): State>>, Json(form): Json, ) -> ServerResult where P: Publisher + Sync + Send, C: CacheService + Sync + Send, S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, { let workers = state.workers(); let mut workers_guard = workers.write().await; @@ -382,3 +385,240 @@ where tracing::warn!("{}", &msg); Err(ServerError::Launched(msg)) } + +#[utoipa::path( + get, + path = "/sources/all", + tag = "sources", + responses( + ( + status = 200, + description = "Successful", + body = Vec, + example = json!(vec![GetSourcesResponse::example(None)]), + ), + ( + status = 400, + description = "Failed to load all source", + body = ServerError, + example = json!(ServerError::example(Some("failed to load all source".to_string()))), + ), + ( + status = 503, + description = "Server does not available", + body = ServerError, + example = json!(ServerError::example(None)), + ), + ) +)] +pub async fn all_sources( + State(state): State>>, +) -> ServerResult +where + P: Publisher + Sync + Send, + C: CacheService + Sync + Send, + S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, +{ + let storage = state.storage(); + let sources = storage + .load_all() + .await + .map_err(|err| ServerError::InternalError(err.to_string()))? + .into_iter() + .map(GetSourcesResponse::from) + .collect::>(); + + Ok(Json(sources)) +} + +#[utoipa::path( + put, + path = "/sources/add", + tag = "sources", + request_body( + content = CreateSourceForm, + example = json!(CreateSourceForm::example(None)), + ), + responses( + ( + status = 200, + description = "Successful", + body = Vec, + example = json!(Success::example(None)), + ), + ( + status = 400, + description = "Failed to create source", + body = ServerError, + example = json!(ServerError::example(Some("failed to create source".to_string()))), + ), + ( + status = 503, + description = "Server does not available", + body = ServerError, + example = json!(ServerError::example(None)), + ), + ) +)] +pub async fn add_source( + State(state): State>>, + Json(form): Json, +) -> ServerResult +where + P: Publisher + Sync + Send, + C: CacheService + Sync + Send, + S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, +{ + let storage = state.storage(); + storage + .add_source(&form.into()) + .await + .map_err(|err| ServerError::InternalError(err.to_string()))?; + + Ok(Json(Success::default())) +} + +#[utoipa::path( + delete, + path = "/sources/{source_id}", + tag = "sources", + responses( + ( + status = 200, + description = "Successful", + body = Vec, + example = json!(Success::example(None)), + ), + ( + status = 400, + description = "Failed to remove source", + body = ServerError, + example = json!(ServerError::example(Some("failed to remove source".to_string()))), + ), + ( + status = 503, + description = "Server does not available", + body = ServerError, + example = json!(ServerError::example(None)), + ), + ) +)] +pub async fn remove_source( + State(state): State>>, + Path(source_id): Path, +) -> ServerResult +where + P: Publisher + Sync + Send, + C: CacheService + Sync + Send, + S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, +{ + let storage = state.storage(); + storage + .remove_source(source_id) + .await + .map_err(|err| ServerError::InternalError(err.to_string()))?; + + Ok(Json(Success::default())) +} + +#[utoipa::path( + post, + path = "/sources/search", + tag = "sources", + request_body( + content = SearchSourcesForm, + example = json!(SearchSourcesForm::example(None)), + ), + responses( + ( + status = 200, + description = "Successful", + body = Vec, + example = json!(vec![GetSourcesResponse::example(None)]), + ), + ( + status = 400, + description = "Failed to search source", + body = ServerError, + example = json!(ServerError::example(Some("failed to search source".to_string()))), + ), + ( + status = 503, + description = "Server does not available", + body = ServerError, + example = json!(ServerError::example(None)), + ), + ) +)] +pub async fn search_sources( + State(state): State>>, + Json(form): Json, +) -> ServerResult +where + P: Publisher + Sync + Send, + C: CacheService + Sync + Send, + S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, +{ + let storage = state.storage(); + let founded_topics = storage + .search_source(form.query()) + .await + .map_err(|err| ServerError::InternalError(err.to_string()))? + .into_iter() + .map(GetSourcesResponse::from) + .collect::>(); + + Ok(Json(founded_topics)) +} + +#[utoipa::path( + patch, + path = "/sources/update", + tag = "sources", + request_body( + content = CreateSourceForm, + example = json!(CreateSourceForm::example(None)), + ), + responses( + ( + status = 200, + description = "Successful", + body = Success, + example = json!(Success::example(None)), + ), + ( + status = 400, + description = "Failed to update source", + body = ServerError, + example = json!(ServerError::example(Some("failed to update source".to_string()))), + ), + ( + status = 503, + description = "Server does not available", + body = ServerError, + example = json!(ServerError::example(None)), + ), + ) +)] +pub async fn update_source( + State(state): State>>, + Json(form): Json, +) -> ServerResult +where + P: Publisher + Sync + Send, + C: CacheService + Sync + Send, + S: CrawlerService + Sync + Send, + R: LoadTopic + Sync + Send, +{ + let storage = state.storage(); + storage + .update_source(&form.into()) + .await + .map_err(|err| ServerError::InternalError(err.to_string()))?; + + Ok(Json(Success::default())) +} diff --git a/src/server/swagger.rs b/src/server/swagger.rs index 4950fc4..8402d35 100644 --- a/src/server/swagger.rs +++ b/src/server/swagger.rs @@ -30,6 +30,11 @@ pub fn init_swagger() -> SwaggerUi { restart_worker, terminate_worker, delete_worker, + all_sources, + add_source, + remove_source, + update_source, + search_sources, ), components( schemas( @@ -38,6 +43,9 @@ pub fn init_swagger() -> SwaggerUi { CreateWorkerForm, DeleteWorkerForm, TerminateWorkerForm, + CreateSourceForm, + GetSourcesResponse, + SearchSourcesForm, ), ), )] From 25b6008efc7089c58f9f35a5eb130274280a06c4 Mon Sep 17 00:00:00 2001 From: Bread White Date: Wed, 13 Nov 2024 22:33:57 +0300 Subject: [PATCH 4/6] chore(bins): updated bins after all changes --- src/bin/main.rs | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/bin/main.rs b/src/bin/main.rs index f839aa0..75caa97 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -7,19 +7,16 @@ use news_rss::crawler::llm::LlmCrawler; #[cfg(feature = "publish-offline")] use news_rss::publish::pgsql::PgsqlPublisher; -#[cfg(feature = "storage-pgsql")] -use news_rss::storage::pgsql; - -#[allow(unused_imports)] -use news_rss::feeds::rss_feeds::config::RssConfig; - use news_rss::cache::local::LocalCache; use news_rss::config::ServiceConfig; use news_rss::crawler::native::NativeCrawler; +use news_rss::feeds::rss_feeds::config::RssConfig; use news_rss::feeds::rss_feeds::RssFeeds; use news_rss::feeds::FetchTopic; use news_rss::publish::rabbit::RabbitPublisher; use news_rss::server::{RssWorker, ServerApp}; +use news_rss::storage::pgsql::PgsqlTopicStorage; +use news_rss::storage::LoadTopic; use news_rss::{logger, server, ServiceConnect}; use std::collections::HashMap; use std::sync::Arc; @@ -46,10 +43,11 @@ async fn main() -> Result<(), anyhow::Error> { #[cfg(feature = "crawler-llm")] let crawler = build_llm_crawler(&config).await?; - #[allow(unused_variables)] - let rss_config = [config.topics().rss()]; - #[cfg(feature = "storage-pgsql")] - let rss_config = load_topics_from_pgsql(&config).await?; + let rss_config = config.topics().rss(); + let pgsql_config = config.storage().pgsql(); + let storage = PgsqlTopicStorage::connect(pgsql_config).await?; + let rss_config = load_topics_from_pgsql(&rss_config, &storage).await?; + let pg_storage = Arc::new(storage); let rss_workers = rss_config .into_iter() @@ -67,7 +65,7 @@ async fn main() -> Result<(), anyhow::Error> { .collect::>(); let listener = TcpListener::bind(config.server().address()).await?; - let server_app = ServerApp::new(rss_workers, publish, cache, crawler); + let server_app = ServerApp::new(rss_workers, publish, cache, crawler, pg_storage); let trace_layer = trace::TraceLayer::new_for_http() .make_span_with(trace::DefaultMakeSpan::new().level(tracing::Level::INFO)) .on_response(trace::DefaultOnResponse::new().level(tracing::Level::INFO)); @@ -133,25 +131,24 @@ pub async fn build_llm_crawler(config: &ServiceConfig) -> Result Ok(crawler) } -#[cfg(feature = "storage-pgsql")] pub async fn load_topics_from_pgsql( - config: &ServiceConfig, + rss_config: &RssConfig, + storage: &PgsqlTopicStorage, ) -> Result, anyhow::Error> { - use news_rss::storage::LoadTopic; - - let rss_config = config.topics().rss(); - - let pgsql_config = config.storage().pgsql(); - let storage = pgsql::PgsqlTopicStorage::connect(pgsql_config).await?; let mut topics = storage .load_at_launch() - .await? + .await + .map_err(|err| { + tracing::error!(err=?err, "failed to load topics from pgsql"); + err + }) + .unwrap_or_default() .into_iter() .map(RssConfig::from) .map(|it: RssConfig| (it.target_url().to_owned(), it)) .collect::>(); - topics.insert(rss_config.target_url().to_owned(), rss_config); + topics.insert(rss_config.target_url().to_owned(), rss_config.to_owned()); let topics = topics.into_values().collect(); Ok(topics) } From 68d98dcfead57ddf6c10269fbaaf74938dc6f618 Mon Sep 17 00:00:00 2001 From: Bread White Date: Wed, 13 Nov 2024 22:34:04 +0300 Subject: [PATCH 5/6] chore(tests): updated tests after all changes --- tests/mocks/mock_rmq_publish.rs | 2 +- tests/test_storage_pgsql.rs | 26 -------------------------- tests/tests_helper.rs | 5 +---- 3 files changed, 2 insertions(+), 31 deletions(-) delete mode 100644 tests/test_storage_pgsql.rs diff --git a/tests/mocks/mock_rmq_publish.rs b/tests/mocks/mock_rmq_publish.rs index bef96e6..fc0fe8e 100644 --- a/tests/mocks/mock_rmq_publish.rs +++ b/tests/mocks/mock_rmq_publish.rs @@ -4,7 +4,7 @@ use news_rss::publish::Publisher; use news_rss::ServiceConnect; #[derive(Clone)] -pub struct MockRabbitPublisher {} +pub struct MockRabbitPublisher; #[async_trait::async_trait] impl ServiceConnect for MockRabbitPublisher { diff --git a/tests/test_storage_pgsql.rs b/tests/test_storage_pgsql.rs deleted file mode 100644 index 6ee37c4..0000000 --- a/tests/test_storage_pgsql.rs +++ /dev/null @@ -1,26 +0,0 @@ -mod tests_helper; - -#[cfg(feature = "storage-pgsql")] -mod test_storage_pgsql { - use crate::tests_helper; - - use news_rss::config::ServiceConfig; - use news_rss::logger; - use news_rss::storage::LoadTopic; - - #[tokio::test] - async fn test_load_all() -> Result<(), anyhow::Error> { - let config = ServiceConfig::new()?; - logger::init_logger(config.logger())?; - - let storage = tests_helper::build_pgsql_storage(&config).await?; - - let result = storage.load_all().await?; - assert!(result.len() > 0); - - let result = storage.load_at_launch().await?; - assert!(result.len() >= 1); - - Ok(()) - } -} diff --git a/tests/tests_helper.rs b/tests/tests_helper.rs index b5c1844..90cbaec 100644 --- a/tests/tests_helper.rs +++ b/tests/tests_helper.rs @@ -13,14 +13,12 @@ use news_rss::crawler::llm::LlmCrawler; #[cfg(feature = "publish-offline")] use news_rss::publish::pgsql::PgsqlPublisher; -#[cfg(feature = "storage-pgsql")] -use news_rss::storage::pgsql::PgsqlTopicStorage; - use news_rss::cache::local::LocalCache; use news_rss::config::ServiceConfig; use news_rss::crawler::native::NativeCrawler; use news_rss::publish::rabbit::config::RabbitConfig; use news_rss::publish::rabbit::RabbitPublisher; +use news_rss::storage::pgsql::PgsqlTopicStorage; use news_rss::ServiceConnect; use std::sync::Arc; use wiremock::matchers::{method, path}; @@ -160,7 +158,6 @@ pub async fn build_pgsql_publish( Ok(pgsql) } -#[cfg(feature = "storage-pgsql")] pub async fn build_pgsql_storage( config: &ServiceConfig, ) -> Result, anyhow::Error> { From 2b4ddf0ceeaee524c6691eb4713d9a2eb38ef417 Mon Sep 17 00:00:00 2001 From: Bread White Date: Wed, 13 Nov 2024 22:34:17 +0300 Subject: [PATCH 6/6] chore(db): updated .sqlx cache files --- ...929abd9afa6f48ecd584018e3dbd9b812fd08.json | 16 +++++++++++++++ ...ab93f82d233f6d2aff8b06b8d2cd88d83ab36.json | 20 +++++++++++++++++++ ...328bf82882308d099105b2d071f700b1c5bdd.json | 14 +++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 .sqlx/query-44fa8db07d271101a8ae45e8faa929abd9afa6f48ecd584018e3dbd9b812fd08.json create mode 100644 .sqlx/query-78435cd0a686b13f8ace8122c2eab93f82d233f6d2aff8b06b8d2cd88d83ab36.json create mode 100644 .sqlx/query-deed1b6c34f5daac0606b73033d328bf82882308d099105b2d071f700b1c5bdd.json diff --git a/.sqlx/query-44fa8db07d271101a8ae45e8faa929abd9afa6f48ecd584018e3dbd9b812fd08.json b/.sqlx/query-44fa8db07d271101a8ae45e8faa929abd9afa6f48ecd584018e3dbd9b812fd08.json new file mode 100644 index 0000000..a366bee --- /dev/null +++ b/.sqlx/query-44fa8db07d271101a8ae45e8faa929abd9afa6f48ecd584018e3dbd9b812fd08.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO rss_sources (name, link, run_at_launch)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Bool" + ] + }, + "nullable": [] + }, + "hash": "44fa8db07d271101a8ae45e8faa929abd9afa6f48ecd584018e3dbd9b812fd08" +} diff --git a/.sqlx/query-78435cd0a686b13f8ace8122c2eab93f82d233f6d2aff8b06b8d2cd88d83ab36.json b/.sqlx/query-78435cd0a686b13f8ace8122c2eab93f82d233f6d2aff8b06b8d2cd88d83ab36.json new file mode 100644 index 0000000..40e0543 --- /dev/null +++ b/.sqlx/query-78435cd0a686b13f8ace8122c2eab93f82d233f6d2aff8b06b8d2cd88d83ab36.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE rss_sources\n SET name = $2,\n link = $3,\n run_at_launch = $4,\n max_retries = $5,\n timeout = $6,\n interval_secs = $7\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Text", + "Text", + "Bool", + "Int4", + "Int4", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "78435cd0a686b13f8ace8122c2eab93f82d233f6d2aff8b06b8d2cd88d83ab36" +} diff --git a/.sqlx/query-deed1b6c34f5daac0606b73033d328bf82882308d099105b2d071f700b1c5bdd.json b/.sqlx/query-deed1b6c34f5daac0606b73033d328bf82882308d099105b2d071f700b1c5bdd.json new file mode 100644 index 0000000..a03f493 --- /dev/null +++ b/.sqlx/query-deed1b6c34f5daac0606b73033d328bf82882308d099105b2d071f700b1c5bdd.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM rss_sources\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [] + }, + "hash": "deed1b6c34f5daac0606b73033d328bf82882308d099105b2d071f700b1c5bdd" +}