Skip to content

Commit

Permalink
Feature: pgsql storage routes (#10)
Browse files Browse the repository at this point in the history
* fix(compose): docker compose fixes

* feat(storage): impled pgsql storage

* chore(server): impled routes for pgsql storage

* chore(bins): updated bins after all changes

* chore(tests): updated tests after all changes

* chore(db): updated .sqlx cache files

---------

Co-authored-by: Bread White <[email protected]>
  • Loading branch information
breadrock1 and Bread White authored Nov 13, 2024
1 parent bd5f0bc commit 258c500
Show file tree
Hide file tree
Showing 16 changed files with 570 additions and 85 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ edition = "2021"

[features]
cache-redis = ["dep:redis"]
publish-offline = ["dep:sqlx"]
storage-pgsql = ["dep:sqlx"]
publish-offline = []
crawler-llm = ["dep:openai_dive", "dep:html2text", "dep:html_editor"]
default = []

Expand Down Expand Up @@ -68,7 +67,6 @@ version = "^1.0"
features = ["derive", "serde_derive"]

[dependencies.sqlx]
optional = true
version = "^0.7"
features = ["postgres", "runtime-tokio", "chrono"]

Expand Down
39 changes: 18 additions & 21 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@ use news_rss::crawler::llm::LlmCrawler;
#[cfg(feature = "publish-offline")]
use news_rss::publish::pgsql::PgsqlPublisher;

#[cfg(feature = "storage-pgsql")]
use news_rss::storage::pgsql;

#[allow(unused_imports)]
use news_rss::feeds::rss_feeds::config::RssConfig;

use news_rss::cache::local::LocalCache;
use news_rss::config::ServiceConfig;
use news_rss::crawler::native::NativeCrawler;
use news_rss::feeds::rss_feeds::config::RssConfig;
use news_rss::feeds::rss_feeds::RssFeeds;
use news_rss::feeds::FetchTopic;
use news_rss::publish::rabbit::RabbitPublisher;
use news_rss::server::{RssWorker, ServerApp};
use news_rss::storage::pgsql::PgsqlTopicStorage;
use news_rss::storage::LoadTopic;
use news_rss::{logger, server, ServiceConnect};
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -46,10 +43,11 @@ async fn main() -> Result<(), anyhow::Error> {
#[cfg(feature = "crawler-llm")]
let crawler = build_llm_crawler(&config).await?;

#[allow(unused_variables)]
let rss_config = [config.topics().rss()];
#[cfg(feature = "storage-pgsql")]
let rss_config = load_topics_from_pgsql(&config).await?;
let rss_config = config.topics().rss();
let pgsql_config = config.storage().pgsql();
let storage = PgsqlTopicStorage::connect(pgsql_config).await?;
let rss_config = load_topics_from_pgsql(&rss_config, &storage).await?;
let pg_storage = Arc::new(storage);

let rss_workers = rss_config
.into_iter()
Expand All @@ -67,7 +65,7 @@ async fn main() -> Result<(), anyhow::Error> {
.collect::<HashMap<String, RssWorker>>();

let listener = TcpListener::bind(config.server().address()).await?;
let server_app = ServerApp::new(rss_workers, publish, cache, crawler);
let server_app = ServerApp::new(rss_workers, publish, cache, crawler, pg_storage);
let trace_layer = trace::TraceLayer::new_for_http()
.make_span_with(trace::DefaultMakeSpan::new().level(tracing::Level::INFO))
.on_response(trace::DefaultOnResponse::new().level(tracing::Level::INFO));
Expand Down Expand Up @@ -133,25 +131,24 @@ pub async fn build_llm_crawler(config: &ServiceConfig) -> Result<Arc<LlmCrawler>
Ok(crawler)
}

#[cfg(feature = "storage-pgsql")]
pub async fn load_topics_from_pgsql(
config: &ServiceConfig,
rss_config: &RssConfig,
storage: &PgsqlTopicStorage,
) -> Result<Vec<RssConfig>, anyhow::Error> {
use news_rss::storage::LoadTopic;

let rss_config = config.topics().rss();

let pgsql_config = config.storage().pgsql();
let storage = pgsql::PgsqlTopicStorage::connect(pgsql_config).await?;
let mut topics = storage
.load_at_launch()
.await?
.await
.map_err(|err| {
tracing::error!(err=?err, "failed to load topics from pgsql");
err
})
.unwrap_or_default()
.into_iter()
.map(RssConfig::from)
.map(|it: RssConfig| (it.target_url().to_owned(), it))
.collect::<HashMap<String, RssConfig>>();

topics.insert(rss_config.target_url().to_owned(), rss_config);
topics.insert(rss_config.target_url().to_owned(), rss_config.to_owned());
let topics = topics.into_values().collect();
Ok(topics)
}
114 changes: 114 additions & 0 deletions src/server/forms.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::feeds::rss_feeds::config::RssConfig;
use crate::server::swagger::SwaggerExamples;
use crate::storage::pgsql::models::PgsqlTopicModel;

use derive_builder::Builder;
use getset::{CopyGetters, Getters};
Expand Down Expand Up @@ -182,3 +183,116 @@ impl SwaggerExamples for GetInfoResponse {
}
}
}

#[derive(Builder, Deserialize, Serialize, IntoParams, ToSchema)]
pub struct GetSourcesResponse {
id: i32,
#[schema(example = "BBC")]
name: String,
#[schema(example = "https://bbc-news.com/rss.xml")]
link: String,
#[schema(example = false)]
run_at_launch: bool,
#[schema(example = 3)]
max_retries: i32,
#[schema(example = 100)]
timeout: i32,
#[schema(example = 3600)]
interval_secs: i32,
}

impl From<PgsqlTopicModel> for GetSourcesResponse {
fn from(value: PgsqlTopicModel) -> Self {
GetSourcesResponseBuilder::default()
.id(value.id)
.name(value.name.to_owned())
.link(value.link.to_owned())
.run_at_launch(value.run_at_launch)
.max_retries(value.max_retries)
.timeout(value.timeout)
.interval_secs(value.interval_secs)
.build()
.unwrap()
}
}

impl SwaggerExamples for GetSourcesResponse {
type Example = Self;

fn example(_value: Option<String>) -> Self::Example {
GetSourcesResponseBuilder::default()
.id(1)
.name("BBC".to_owned())
.link("https://bbc-news.com/rss.xml".to_owned())
.run_at_launch(true)
.max_retries(3)
.timeout(100)
.interval_secs(3600)
.build()
.unwrap()
}
}

#[derive(Builder, Deserialize, Serialize, IntoParams, ToSchema)]
pub struct CreateSourceForm {
#[schema(example = "BBC")]
name: String,
#[schema(example = "https://bbc-news.com/rss.xml")]
link: String,
#[schema(example = false)]
run_at_launch: bool,
#[schema(example = 3)]
max_retries: i32,
#[schema(example = 100)]
timeout: i32,
#[schema(example = 3600)]
interval_secs: i32,
}

impl From<CreateSourceForm> for PgsqlTopicModel {
fn from(value: CreateSourceForm) -> Self {
PgsqlTopicModel::builder()
.id(0)
.name(value.name.to_owned())
.link(value.link.to_owned())
.run_at_launch(value.run_at_launch)
.max_retries(value.max_retries)
.timeout(value.timeout)
.interval_secs(value.interval_secs)
.build()
.unwrap()
}
}

impl SwaggerExamples for CreateSourceForm {
type Example = Self;

fn example(_value: Option<String>) -> Self::Example {
CreateSourceFormBuilder::default()
.name("BBC".to_owned())
.link("https://bbc-news.com/rss.xml".to_owned())
.run_at_launch(true)
.max_retries(3)
.timeout(100)
.interval_secs(3600)
.build()
.unwrap()
}
}

#[derive(Getters, Deserialize, Serialize, IntoParams, ToSchema)]
#[getset(get = "pub")]
pub struct SearchSourcesForm {
#[schema(example = "World")]
query: String,
}

impl SwaggerExamples for SearchSourcesForm {
type Example = Self;

fn example(_value: Option<String>) -> Self::Example {
SearchSourcesForm {
query: "World".to_string(),
}
}
}
35 changes: 30 additions & 5 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use crate::cache::CacheService;
use crate::crawler::CrawlerService;
use crate::feeds::rss_feeds::config::RssConfig;
use crate::publish::Publisher;
use crate::storage::pgsql::models::PgsqlTopicModel;
use crate::storage::LoadTopic;

use axum::routing::{delete, get, post, put};
use axum::routing::{delete, get, patch, post, put};
use axum::Router;
use getset::Getters;
use std::collections::HashMap;
Expand All @@ -32,44 +34,62 @@ impl RssWorker {
}
}

pub struct ServerApp<P, C, S>
pub struct ServerApp<P, C, S, R>
where
P: Publisher,
C: CacheService,
S: CrawlerService,
R: LoadTopic,
{
workers: Arc<RwLock<JoinableWorkers>>,
publish: Arc<P>,
cache: Arc<C>,
crawler: Arc<S>,
storage: Arc<R>,
}

impl<P, C, S> ServerApp<P, C, S>
impl<P, C, S, R> ServerApp<P, C, S, R>
where
P: Publisher,
C: CacheService,
S: CrawlerService,
R: LoadTopic,
{
pub fn new(workers: JoinableWorkers, publish: Arc<P>, cache: Arc<C>, crawler: Arc<S>) -> Self {
pub fn new(
workers: JoinableWorkers,
publish: Arc<P>,
cache: Arc<C>,
crawler: Arc<S>,
storage: Arc<R>,
) -> Self {
let workers_guard = Arc::new(RwLock::new(workers));
ServerApp {
workers: workers_guard,
publish,
cache,
crawler,
storage,
}
}

pub fn workers(&self) -> Arc<RwLock<JoinableWorkers>> {
self.workers.clone()
}

pub fn storage(&self) -> Arc<R> {
self.storage.clone()
}
}

pub fn init_server<P, C, S>(app: ServerApp<P, C, S>) -> Router
pub fn init_server<P, C, S, R>(app: ServerApp<P, C, S, R>) -> Router
where
P: Publisher + Sync + Send + 'static,
C: CacheService + Sync + Send + 'static,
S: CrawlerService + Sync + Send + 'static,
R: LoadTopic<TopicId = i32, Topic = PgsqlTopicModel, Error = sqlx::Error>
+ Sync
+ Send
+ 'static,
{
let app_arc = Arc::new(app);
Router::new()
Expand All @@ -80,5 +100,10 @@ where
.route("/workers/restart", post(routers::restart_worker))
.route("/workers/delete", delete(routers::delete_worker))
.route("/workers/terminate", post(routers::terminate_worker))
.route("/sources/all", get(routers::all_sources))
.route("/sources/add", put(routers::add_source))
.route("/sources/search", post(routers::search_sources))
.route("/sources/update", patch(routers::update_source))
.route("/sources/:source_id", delete(routers::remove_source))
.with_state(app_arc)
}
Loading

0 comments on commit 258c500

Please sign in to comment.