Skip to content

Commit

Permalink
*: introduce basic infrastructure for pub-sub ibus communication
Browse files Browse the repository at this point in the history
This commit introduces the `IbusSubscriber` struct, which allows
identification of subscribers in ibus messages and enables receivers
to send notifications back to them.

For convenience, a `subscriber` field has been added to
`IbusChannelsTx`. This field can now be used by any base component
or protocol instance to send subscription messages.

Additionally, a new `Disconnect` ibus message has been
introduced. This message is automatically sent to all base components
by protocol instances when they are shutting down, relieving them
from having to manually undo subscriptions.

Signed-off-by: Renato Westphal <[email protected]>
  • Loading branch information
rwestphal committed Feb 8, 2025
1 parent d3e42bb commit 89dd679
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 14 deletions.
5 changes: 3 additions & 2 deletions holo-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use holo_northbound::{
process_northbound_msg,
};
use holo_protocol::InstanceShared;
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver};
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver, IbusSubscriber};
use tokio::sync::mpsc;
use tracing::Instrument;

Expand Down Expand Up @@ -74,11 +74,12 @@ impl Master {

pub fn start(
nb_tx: NbProviderSender,
ibus_tx: IbusChannelsTx,
mut ibus_tx: IbusChannelsTx,
ibus_rx: IbusReceiver,
shared: InstanceShared,
) -> NbDaemonSender {
let (nb_daemon_tx, nb_daemon_rx) = mpsc::channel(4);
ibus_tx.subscriber = Some(IbusSubscriber::new(ibus_tx.interface.clone()));

tokio::spawn(async move {
// Initialize netlink socket.
Expand Down
1 change: 1 addition & 0 deletions holo-interface/src/northbound/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ impl Provider for Master {
ifname,
&self.nb_tx,
&self.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
self.shared.clone(),
Expand Down
5 changes: 3 additions & 2 deletions holo-keychain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use holo_northbound::{
NbDaemonReceiver, NbDaemonSender, NbProviderSender, ProviderBase,
process_northbound_msg,
};
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver};
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver, IbusSubscriber};
use holo_utils::keychain::Keychain;
use tokio::sync::mpsc;
use tracing::Instrument;
Expand Down Expand Up @@ -61,10 +61,11 @@ impl Master {

pub fn start(
nb_provider_tx: NbProviderSender,
ibus_tx: IbusChannelsTx,
mut ibus_tx: IbusChannelsTx,
ibus_rx: IbusReceiver,
) -> NbDaemonSender {
let (nb_daemon_tx, nb_daemon_rx) = mpsc::channel(4);
ibus_tx.subscriber = Some(IbusSubscriber::new(ibus_tx.keychain.clone()));

tokio::spawn(async move {
let span = Master::debug_span("");
Expand Down
5 changes: 3 additions & 2 deletions holo-policy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use holo_northbound::{
NbDaemonReceiver, NbDaemonSender, NbProviderSender, ProviderBase,
process_northbound_msg,
};
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver};
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver, IbusSubscriber};
use holo_utils::policy::{MatchSets, Policy};
use tokio::sync::mpsc;
use tracing::Instrument;
Expand Down Expand Up @@ -64,10 +64,11 @@ impl Master {

pub fn start(
nb_provider_tx: NbProviderSender,
ibus_tx: IbusChannelsTx,
mut ibus_tx: IbusChannelsTx,
ibus_rx: IbusReceiver,
) -> NbDaemonSender {
let (nb_daemon_tx, nb_daemon_rx) = mpsc::channel(4);
ibus_tx.subscriber = Some(IbusSubscriber::new(ibus_tx.policy.clone()));

tokio::spawn(async move {
let span = Master::debug_span("");
Expand Down
23 changes: 20 additions & 3 deletions holo-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use holo_northbound::{
NbDaemonReceiver, NbDaemonSender, NbProviderSender, process_northbound_msg,
};
use holo_utils::bier::BierCfg;
use holo_utils::ibus::{IbusChannelsTx, IbusMsg, IbusReceiver};
use holo_utils::ibus::{
IbusChannelsTx, IbusMsg, IbusReceiver, IbusSender, IbusSubscriber,
};
use holo_utils::keychain::Keychains;
use holo_utils::mpls::LabelManager;
use holo_utils::policy::{MatchSets, Policies};
Expand Down Expand Up @@ -318,7 +320,7 @@ async fn run<P>(
// Create instance Tx/Rx channels.
let instance_channels_tx = InstanceChannelsTx::new(
nb_tx,
ibus_tx,
ibus_tx.clone(),
proto_input_tx,
#[cfg(feature = "testing")]
proto_output_tx,
Expand Down Expand Up @@ -353,6 +355,19 @@ async fn run<P>(
)
.await;

// Cancel ibus subscriptions.
for tx in &[
&ibus_tx.routing,
&ibus_tx.interface,
&ibus_tx.system,
&ibus_tx.keychain,
&ibus_tx.policy,
] {
let _ = tx.send(IbusMsg::Disconnect {
subscriber: ibus_tx.subscriber.clone(),
});
}

// Ensure instance is shut down before exiting.
instance.shutdown().await;
}
Expand All @@ -363,6 +378,7 @@ pub fn spawn_protocol_task<P>(
name: String,
nb_provider_tx: &NbProviderSender,
ibus_tx: &IbusChannelsTx,
ibus_instance_tx: IbusSender,
ibus_instance_rx: IbusReceiver,
agg_channels: InstanceAggChannels<P>,
#[cfg(feature = "testing")] test_rx: Receiver<
Expand All @@ -375,7 +391,8 @@ where
{
let (nb_daemon_tx, nb_daemon_rx) = mpsc::channel(4);
let nb_provider_tx = nb_provider_tx.clone();
let ibus_tx = ibus_tx.clone();
let mut ibus_tx = ibus_tx.clone();
ibus_tx.subscriber = Some(IbusSubscriber::new(ibus_instance_tx));

tokio::spawn(async move {
let span = P::debug_span(&name);
Expand Down
3 changes: 2 additions & 1 deletion holo-protocol/src/test/stub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,15 @@ where
// Spawn protocol task.
let (nb_provider_tx, nb_provider_rx) = mpsc::unbounded_channel();
let (ibus_ctx, ibus_crx) = ibus::ibus_channels();
let (_ibus_instance_tx, ibus_instance_rx) = mpsc::unbounded_channel();
let (ibus_instance_tx, ibus_instance_rx) = mpsc::unbounded_channel();
let channels = InstanceAggChannels::default();
let instance_tx = channels.tx.clone();
let (test_tx, test_rx) = mpsc::channel(4);
let nb_daemon_tx = spawn_protocol_task::<P>(
name.to_owned(),
&nb_provider_tx,
&ibus_ctx,
ibus_instance_tx,
ibus_instance_rx,
channels,
test_rx,
Expand Down
8 changes: 6 additions & 2 deletions holo-routing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use holo_northbound::{
};
use holo_protocol::InstanceShared;
use holo_utils::bier::BierCfg;
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver, IbusSender};
use holo_utils::ibus::{
IbusChannelsTx, IbusReceiver, IbusSender, IbusSubscriber,
};
use holo_utils::protocol::Protocol;
use holo_utils::southbound::InterfaceFlags;
use holo_utils::sr::SrCfg;
Expand Down Expand Up @@ -117,11 +119,12 @@ impl Master {

pub fn start(
nb_tx: NbProviderSender,
ibus_tx: IbusChannelsTx,
mut ibus_tx: IbusChannelsTx,
ibus_rx: IbusReceiver,
shared: InstanceShared,
) -> NbDaemonSender {
let (nb_daemon_tx, nb_daemon_rx) = mpsc::channel(4);
ibus_tx.subscriber = Some(IbusSubscriber::new(ibus_tx.routing.clone()));

tokio::spawn(async move {
let mut master = Master {
Expand Down Expand Up @@ -154,6 +157,7 @@ pub fn start(
name,
&master.nb_tx,
&master.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
shared,
Expand Down
7 changes: 7 additions & 0 deletions holo-routing/src/northbound/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ fn instance_start(master: &mut Master, protocol: Protocol, name: String) {
name,
&master.nb_tx,
&master.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
master.shared.clone(),
Expand All @@ -1146,6 +1147,7 @@ fn instance_start(master: &mut Master, protocol: Protocol, name: String) {
name,
&master.nb_tx,
&master.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
master.shared.clone(),
Expand All @@ -1159,6 +1161,7 @@ fn instance_start(master: &mut Master, protocol: Protocol, name: String) {
name,
&master.nb_tx,
&master.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
master.shared.clone(),
Expand All @@ -1173,6 +1176,7 @@ fn instance_start(master: &mut Master, protocol: Protocol, name: String) {
name,
&master.nb_tx,
&master.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
master.shared.clone(),
Expand All @@ -1187,6 +1191,7 @@ fn instance_start(master: &mut Master, protocol: Protocol, name: String) {
name,
&master.nb_tx,
&master.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
master.shared.clone(),
Expand All @@ -1201,6 +1206,7 @@ fn instance_start(master: &mut Master, protocol: Protocol, name: String) {
name,
&master.nb_tx,
&master.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
master.shared.clone(),
Expand All @@ -1215,6 +1221,7 @@ fn instance_start(master: &mut Master, protocol: Protocol, name: String) {
name,
&master.nb_tx,
&master.ibus_tx,
ibus_instance_tx.clone(),
ibus_instance_rx,
Default::default(),
master.shared.clone(),
Expand Down
5 changes: 3 additions & 2 deletions holo-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use holo_northbound::{
NbDaemonReceiver, NbDaemonSender, NbProviderSender, ProviderBase,
process_northbound_msg,
};
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver};
use holo_utils::ibus::{IbusChannelsTx, IbusReceiver, IbusSubscriber};
use northbound::configuration::SystemCfg;
use tokio::sync::mpsc;
use tracing::Instrument;
Expand Down Expand Up @@ -60,10 +60,11 @@ impl Master {

pub fn start(
nb_tx: NbProviderSender,
ibus_tx: IbusChannelsTx,
mut ibus_tx: IbusChannelsTx,
ibus_rx: IbusReceiver,
) -> NbDaemonSender {
let (nb_daemon_tx, nb_daemon_rx) = mpsc::channel(4);
ibus_tx.subscriber = Some(IbusSubscriber::new(ibus_tx.system.clone()));

tokio::spawn(async move {
let mut master = Master {
Expand Down
29 changes: 29 additions & 0 deletions holo-utils/src/ibus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
Expand All @@ -30,6 +31,7 @@ pub type IbusSender = UnboundedSender<IbusMsg>;
// Ibus output channels.
#[derive(Clone, Debug)]
pub struct IbusChannelsTx {
pub subscriber: Option<IbusSubscriber>,
pub routing: UnboundedSender<IbusMsg>,
pub interface: UnboundedSender<IbusMsg>,
pub system: UnboundedSender<IbusMsg>,
Expand All @@ -47,6 +49,15 @@ pub struct IbusChannelsRx {
pub policy: UnboundedReceiver<IbusMsg>,
}

// Subscriber to Ibus messages.
#[derive(Clone, Debug)]
pub struct IbusSubscriber {
// Unique identifier for the subscriber.
pub id: usize,
// Channel for sending messages to the subscriber.
pub tx: IbusSender,
}

// Ibus message for communication among the different Holo components.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum IbusMsg {
Expand Down Expand Up @@ -150,6 +161,23 @@ pub enum IbusMsg {
// purged. E.g., One could ask to purge the BIRT populated by a specific
// instance of OSPFv3 but not those populated by IS-IS.
BierPurge,
// Cancel all previously requested subscriptions.
Disconnect {
#[serde(skip)]
subscriber: Option<IbusSubscriber>,
},
}

// ===== impl IbusSubscriber =====

impl IbusSubscriber {
pub fn new(tx: IbusSender) -> Self {
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
IbusSubscriber {
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
tx,
}
}
}

// ===== global functions =====
Expand All @@ -162,6 +190,7 @@ pub fn ibus_channels() -> (IbusChannelsTx, IbusChannelsRx) {
let (policy_tx, policy_rx) = mpsc::unbounded_channel();

let tx = IbusChannelsTx {
subscriber: None,
routing: routing_tx,
interface: interface_tx,
system: system_tx,
Expand Down

0 comments on commit 89dd679

Please sign in to comment.