Skip to content

Commit

Permalink
Merge/local changes (#12)
Browse files Browse the repository at this point in the history
* fix(compose): docker compose fixes

* chore(rmq): replaced direct to fanout exchange kind

* chore(ci): fixed fmt and clippy warnigns

---------

Co-authored-by: Bread White <[email protected]>
  • Loading branch information
breadrock1 and Bread White authored Nov 25, 2024
1 parent 9366e34 commit 372866b
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 54 deletions.
4 changes: 2 additions & 2 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ expired_secs = 360
address = "amqp://localhost:5672"
username = "rmq"
password = "rmq"
stream_name = "news-rss-stream"
exchange = "news-rss-exchange"
routing_key = "news-rss-routing"
capacity_gb = 1
no_wait = true
durable = false

[publish.pgsql]
address = "localhost:5432"
Expand Down
4 changes: 2 additions & 2 deletions config/production.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ expired_secs = 10368000
address = "amqp://rabbitmq:5672"
username = "rmq"
password = "rmq"
stream_name = "news-rss-stream"
exchange = "news-rss-exchange"
routing_key = "news-rss-routing"
capacity_gb = 1
no_wait = true
durable = false

[publish.pgsql]
address = "pgsql:5432"
Expand Down
6 changes: 4 additions & 2 deletions src/publish/rabbit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ pub struct RabbitConfig {
address: String,
username: String,
password: String,
stream_name: String,
exchange: String,
routing_key: String,
#[getset(skip)]
#[getset(get_copy = "pub")]
capacity_gb: u64,
no_wait: bool,
#[getset(skip)]
#[getset(get_copy = "pub")]
durable: bool,
}
38 changes: 13 additions & 25 deletions src/publish/rabbit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::publish::Publisher;
use crate::ServiceConnect;

use lapin::options::{BasicPublishOptions, ExchangeDeclareOptions};
use lapin::options::{QueueBindOptions, QueueDeclareOptions};
use lapin::types::FieldTable;
use lapin::{BasicProperties, ConnectionProperties, ExchangeKind};
use lapin::{Channel, Connection};
Expand Down Expand Up @@ -39,35 +38,19 @@ impl ServiceConnect for RabbitPublisher {

let exchange_opts = ExchangeDeclareOptions {
nowait: true,
durable: false,
..Default::default()
};

channel
.exchange_declare(
config.exchange(),
ExchangeKind::Direct,
ExchangeKind::Fanout,
exchange_opts,
FieldTable::default(),
)
.await?;

let queue_decl_opts = QueueDeclareOptions {
durable: true,
..Default::default()
};
channel
.queue_declare(config.stream_name(), queue_decl_opts, FieldTable::default())
.await?;

channel
.queue_bind(
config.stream_name(),
config.exchange(),
config.routing_key(),
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;

let client = RabbitPublisher {
config: Arc::new(config.to_owned()),
channel: Arc::new(channel),
Expand All @@ -82,26 +65,31 @@ impl Publisher for RabbitPublisher {
type Error = RabbitPublishError;

async fn publish(&self, news: &PublishNews) -> Result<(), Self::Error> {
let exchange = self.config.exchange();
let routing = self.config.routing_key();
let bytes = serde_json::to_vec(&news)?;
let pub_opts = BasicPublishOptions {
mandatory: true,
immediate: false,
};
let pub_props = BasicProperties::default();

let confirm = self
.channel
.basic_publish(
self.config.exchange(),
self.config.routing_key(),
exchange,
routing,
pub_opts,
bytes.as_slice(),
pub_props,
BasicProperties::default(),
)
.await?
.await?;

tracing::info!("rabbit confirm is: {confirm:?}");
tracing::info!(
exchange = exchange,
routing_key = routing,
"rabbit confirm: {confirm:?}"
);

Ok(())
}
Expand Down
9 changes: 6 additions & 3 deletions tests/test_publish_feeds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use std::sync::Arc;
use std::time::Duration;

const TEST_TIME_EXECUTION: u64 = 5;
const TEST_RMQ_QUEUE_NAME: &str = "news-rss";
const TEST_SOURCE_NAME: &str = "NDTV World News";
const TEST_TARGET_URL: &str = "https://feeds.feedburner.com/ndtvnews-world-news";

#[tokio::test]
async fn test_rss_feeds() -> Result<(), anyhow::Error> {
Expand All @@ -37,8 +40,8 @@ async fn test_rss_feeds() -> Result<(), anyhow::Error> {
let crawler = tests_helper::build_llm_crawler(&config).await?;

let rss_config = vec![RssConfig::builder()
.source_name("NDTV World News".to_owned())
.target_url("https://feeds.feedburner.com/ndtvnews-world-news".to_owned())
.source_name(TEST_SOURCE_NAME.to_owned())
.target_url(TEST_TARGET_URL.to_owned())
.max_retries(3)
.timeout(10)
.interval_secs(5)
Expand All @@ -60,7 +63,7 @@ async fn test_rss_feeds() -> Result<(), anyhow::Error> {
.collect::<HashMap<String, RssWorker>>();

#[cfg(feature = "test-publish-rabbit")]
let _ = tests_helper::rabbit_consumer(config.publish().rmq()).await?;
let _ = tests_helper::rabbit_consumer(TEST_RMQ_QUEUE_NAME, config.publish().rmq()).await?;

tokio::time::sleep(Duration::from_secs(TEST_TIME_EXECUTION)).await;

Expand Down
32 changes: 12 additions & 20 deletions tests/tests_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,38 @@ pub async fn create_llm_completion_route(mock: &MockServer, url: &str, http_meth
#[allow(dead_code)]
#[allow(unused_assignments)]
#[allow(unused_variables)]
pub async fn rabbit_consumer(config: &RabbitConfig) -> Result<(), anyhow::Error> {
pub async fn rabbit_consumer(queue: &str, config: &RabbitConfig) -> Result<(), anyhow::Error> {
let conn_props = ConnectionProperties::default();
let connection = Connection::connect(config.address(), conn_props).await?;
let channel = connection.create_channel().await?;

let exchange_opts = ExchangeDeclareOptions {
nowait: true,
..Default::default()
};
channel
.exchange_declare(
config.exchange(),
ExchangeKind::Direct,
exchange_opts,
FieldTable::default(),
)
.await?;

let queue_decl_opts = QueueDeclareOptions {
durable: true,
durable: config.durable(),
nowait: config.no_wait(),
..Default::default()
};

channel
.queue_declare(config.stream_name(), queue_decl_opts, FieldTable::default())
.queue_declare(queue, queue_decl_opts, FieldTable::default())
.await?;

let queue_bind_opts = QueueBindOptions {
nowait: config.no_wait(),
};

channel
.queue_bind(
config.stream_name(),
queue,
config.exchange(),
config.routing_key(),
QueueBindOptions::default(),
queue_bind_opts,
FieldTable::default(),
)
.await?;

let consumer = channel
.basic_consume(
config.stream_name(),
queue,
TEST_AMQP_CONSUMER_TAG,
BasicConsumeOptions::default(),
FieldTable::default(),
Expand All @@ -114,7 +106,7 @@ pub async fn rabbit_consumer(config: &RabbitConfig) -> Result<(), anyhow::Error>
delivery
.ack(BasicAckOptions::default())
.await
.expect("Failed to ack send_webhook_event message");
.expect("failed to ack send_webhook_event message");
});

counter += 1;
Expand Down

0 comments on commit 372866b

Please sign in to comment.