-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.rs
154 lines (130 loc) · 5.09 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#[cfg(feature = "cache-redis")]
use news_rss::cache::redis::RedisClient;
#[cfg(feature = "crawler-llm")]
use news_rss::crawler::llm::LlmCrawler;
#[cfg(feature = "publish-offline")]
use news_rss::publish::pgsql::PgsqlPublisher;
use news_rss::cache::local::LocalCache;
use news_rss::config::ServiceConfig;
use news_rss::crawler::native::NativeCrawler;
use news_rss::feeds::rss_feeds::config::RssConfig;
use news_rss::feeds::rss_feeds::RssFeeds;
use news_rss::feeds::FetchTopic;
use news_rss::publish::rabbit::RabbitPublisher;
use news_rss::server::{RssWorker, ServerApp};
use news_rss::storage::pgsql::PgsqlTopicStorage;
use news_rss::storage::LoadTopic;
use news_rss::{logger, server, ServiceConnect};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tower_http::{cors, trace};
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let config = ServiceConfig::new()?;
logger::init_logger(config.logger())?;
#[allow(unused_variables)]
let publish = build_rmq_publish(&config).await?;
#[cfg(feature = "publish-offline")]
let publish = build_pgsql_publish(&config).await?;
#[allow(unused_variables)]
let cache = build_local_cache(&config).await?;
#[cfg(feature = "cache-redis")]
let cache = build_redis_cache(&config).await?;
#[allow(unused_variables)]
let crawler = build_native_crawler(&config).await?;
#[cfg(feature = "crawler-llm")]
let crawler = build_llm_crawler(&config).await?;
let rss_config = config.topics().rss();
let pgsql_config = config.storage().pgsql();
let storage = PgsqlTopicStorage::connect(pgsql_config).await?;
let rss_config = load_topics_from_pgsql(&rss_config, &storage).await?;
let pg_storage = Arc::new(storage);
let rss_workers = rss_config
.into_iter()
.filter_map(|it| RssFeeds::new(it, publish.clone(), cache.clone(), crawler.clone()).ok())
.map(|it| {
let config = it.config();
let url = config.target_url();
let it_cln = it.clone();
let worker = tokio::spawn(async move { it_cln.launch_fetching().await });
let rss_worker = RssWorker::new(Arc::new(config.clone()), worker);
(url.to_owned(), rss_worker)
})
.collect::<HashMap<String, RssWorker>>();
let listener = TcpListener::bind(config.server().address()).await?;
let server_app = ServerApp::new(rss_workers, publish, cache, crawler, pg_storage);
let trace_layer = trace::TraceLayer::new_for_http()
.make_span_with(trace::DefaultMakeSpan::new().level(tracing::Level::INFO))
.on_response(trace::DefaultOnResponse::new().level(tracing::Level::INFO));
let cors_layer = cors::CorsLayer::permissive();
let app = server::init_server(server_app)
.layer(trace_layer)
.layer(cors_layer);
axum::serve(listener, app).await?;
Ok(())
}
pub async fn build_local_cache(config: &ServiceConfig) -> Result<Arc<LocalCache>, anyhow::Error> {
let cache_config = config.cache().local();
let cache = LocalCache::connect(cache_config).await?;
let cache = Arc::new(cache);
Ok(cache)
}
#[cfg(feature = "cache-redis")]
pub async fn build_redis_cache(config: &ServiceConfig) -> Result<Arc<RedisClient>, anyhow::Error> {
let redis_config = config.cache().redis();
let cache = RedisClient::connect(redis_config).await?;
let cache = Arc::new(cache);
Ok(cache)
}
pub async fn build_rmq_publish(
config: &ServiceConfig,
) -> Result<Arc<RabbitPublisher>, anyhow::Error> {
let rmq_config = config.publish().rmq();
let rmq = RabbitPublisher::connect(rmq_config).await?;
let rmq = Arc::new(rmq);
Ok(rmq)
}
#[cfg(feature = "publish-offline")]
pub async fn build_pgsql_publish(
config: &ServiceConfig,
) -> Result<Arc<PgsqlPublisher>, anyhow::Error> {
let pgsql_config = config.publish().pgsql();
let pgsql = PgsqlPublisher::connect(pgsql_config).await?;
let pgsql = Arc::new(pgsql);
Ok(pgsql)
}
pub async fn build_native_crawler(
_config: &ServiceConfig,
) -> Result<Arc<NativeCrawler>, anyhow::Error> {
let crawler = NativeCrawler::new();
let crawler = Arc::new(crawler);
Ok(crawler)
}
#[cfg(feature = "crawler-llm")]
pub async fn build_llm_crawler(config: &ServiceConfig) -> Result<Arc<LlmCrawler>, anyhow::Error> {
let crawler_config = config.crawler().llm();
let crawler = LlmCrawler::connect(crawler_config).await?;
let crawler = Arc::new(crawler);
Ok(crawler)
}
pub async fn load_topics_from_pgsql(
rss_config: &RssConfig,
storage: &PgsqlTopicStorage,
) -> Result<Vec<RssConfig>, anyhow::Error> {
let mut topics = storage
.load_at_launch()
.await
.map_err(|err| {
tracing::error!(err=?err, "failed to load topics from pgsql");
err
})
.unwrap_or_default()
.into_iter()
.map(RssConfig::from)
.map(|it: RssConfig| (it.target_url().to_owned(), it))
.collect::<HashMap<String, RssConfig>>();
topics.insert(rss_config.target_url().to_owned(), rss_config.to_owned());
let topics = topics.into_values().collect();
Ok(topics)
}