diff --git a/Cargo.lock b/Cargo.lock
index f99b579bfe919..4fa9e83a1736a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4177,6 +4177,7 @@ dependencies = [
  "cumulus-relay-chain-interface",
  "cumulus-relay-chain-rpc-interface",
  "futures",
+ "parking_lot 0.12.1",
  "polkadot-availability-recovery",
  "polkadot-collator-protocol",
  "polkadot-core-primitives",
@@ -13268,6 +13269,7 @@ dependencies = [
  "pallet-transaction-payment-rpc-runtime-api",
  "parity-db",
  "parity-scale-codec",
+ "parking_lot 0.12.1",
  "polkadot-approval-distribution",
  "polkadot-availability-bitfield-distribution",
  "polkadot-availability-distribution",
@@ -15728,6 +15730,7 @@ dependencies = [
  "array-bytes 4.2.0",
  "arrayvec 0.7.4",
  "blake2 0.10.6",
+ "bytes",
  "futures",
  "futures-timer",
  "libp2p-identity",
@@ -15793,6 +15796,7 @@ dependencies = [
  "tempfile",
  "thiserror",
  "tokio",
+ "tokio-stream",
  "tokio-test",
  "tokio-util",
  "unsigned-varint",
@@ -15848,10 +15852,12 @@ name = "sc-network-gossip"
 version = "0.10.0-dev"
 dependencies = [
  "ahash 0.8.3",
+ "async-trait",
  "futures",
  "futures-timer",
  "libp2p",
  "log",
+ "parity-scale-codec",
  "quickcheck",
  "sc-network",
  "sc-network-common",
diff --git a/cumulus/client/relay-chain-minimal-node/Cargo.toml b/cumulus/client/relay-chain-minimal-node/Cargo.toml
index ce76fc5cd6d25..ee93df09ce1a6 100644
--- a/cumulus/client/relay-chain-minimal-node/Cargo.toml
+++ b/cumulus/client/relay-chain-minimal-node/Cargo.toml
@@ -47,4 +47,5 @@ array-bytes = "6.1"
 tracing = "0.1.37"
 async-trait = "0.1.73"
 futures = "0.3.28"
+parking_lot = "0.12.1"
 
diff --git a/cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs b/cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs
index a785a9f6f79c5..5f5bf338ef990 100644
--- a/cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs
+++ b/cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs
@@ -15,7 +15,8 @@
 // along with Polkadot.  If not, see .
 
 use futures::{select, StreamExt};
-use std::sync::Arc;
+use parking_lot::Mutex;
+use std::{collections::HashMap, sync::Arc};
 
 use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
 use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
@@ -28,7 +29,7 @@ use polkadot_node_core_chain_api::ChainApiSubsystem;
 use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
 use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
 use polkadot_node_network_protocol::{
-	peer_set::PeerSetProtocolNames,
+	peer_set::{PeerSet, PeerSetProtocolNames},
 	request_response::{
 		v1::{self, AvailableDataFetchingRequest},
 		v2, IncomingRequestReceiver, ReqProtocolNames,
@@ -42,7 +43,7 @@ use polkadot_overseer::{
 use polkadot_primitives::CollatorPair;
 
 use sc_authority_discovery::Service as AuthorityDiscoveryService;
-use sc_network::NetworkStateInfo;
+use sc_network::{NetworkStateInfo, NotificationService};
 use sc_service::TaskManager;
 use sc_utils::mpsc::tracing_unbounded;
 
@@ -77,6 +78,8 @@ pub(crate) struct CollatorOverseerGenArgs<'a> {
 	pub req_protocol_names: ReqProtocolNames,
 	/// Peerset protocols name mapping
 	pub peer_set_protocol_names: PeerSetProtocolNames,
+	/// Notification services for validation/collation protocols.
+	pub notification_services: HashMap>,
 }
 
 fn build_overseer(
@@ -94,6 +97,7 @@ fn build_overseer(
 		collator_pair,
 		req_protocol_names,
 		peer_set_protocol_names,
+		notification_services,
 	}: CollatorOverseerGenArgs<'_>,
 ) -> Result<
 	(Overseer, Arc>, OverseerHandle),
@@ -101,6 +105,8 @@ fn build_overseer(
 > {
 	let spawner = SpawnGlue(spawner);
 	let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
+	let notification_sinks = Arc::new(Mutex::new(HashMap::new()));
+
 	let builder = Overseer::builder()
 		.availability_distribution(DummySubsystem)
 		.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
@@ -131,6 +137,8 @@ fn build_overseer(
 			sync_oracle,
 			network_bridge_metrics.clone(),
 			peer_set_protocol_names.clone(),
+			notification_services,
+			notification_sinks.clone(),
 		))
 		.network_bridge_tx(NetworkBridgeTxSubsystem::new(
 			network_service,
@@ -138,6 +146,7 @@ fn build_overseer(
 			network_bridge_metrics,
 			req_protocol_names,
 			peer_set_protocol_names,
+			notification_sinks,
 		))
 		.provisioner(DummySubsystem)
 		.runtime_api(RuntimeApiSubsystem::new(
diff --git a/cumulus/client/relay-chain-minimal-node/src/lib.rs b/cumulus/client/relay-chain-minimal-node/src/lib.rs
index 8801f93640c18..d121d2d335676 100644
--- a/cumulus/client/relay-chain-minimal-node/src/lib.rs
+++ b/cumulus/client/relay-chain-minimal-node/src/lib.rs
@@ -21,7 +21,7 @@ use cumulus_relay_chain_rpc_interface::{RelayChainRpcClient, RelayChainRpcInterf
 use network::build_collator_network;
 use polkadot_network_bridge::{peer_sets_info, IsAuthority};
 use polkadot_node_network_protocol::{
-	peer_set::PeerSetProtocolNames,
+	peer_set::{PeerSet, PeerSetProtocolNames},
 	request_response::{
 		v1, v2, IncomingRequest, IncomingRequestReceiver, Protocol, ReqProtocolNames,
 	},
@@ -175,10 +175,13 @@ async fn new_minimal_relay_chain(
 	let peer_set_protocol_names =
 		PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
 	let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };
-
-	for config in peer_sets_info(is_authority, &peer_set_protocol_names) {
-		net_config.add_notification_protocol(config);
-	}
+	let notification_services = peer_sets_info(is_authority, &peer_set_protocol_names)
+		.into_iter()
+		.map(|(config, (peerset, service))| {
+			net_config.add_notification_protocol(config);
+			(peerset, service)
+		})
+		.collect::>>();
 
 	let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
 	let (collation_req_receiver_v1, collation_req_receiver_v2, available_data_req_receiver) =
@@ -218,6 +221,7 @@ async fn new_minimal_relay_chain(
 		collator_pair,
 		req_protocol_names: request_protocol_names,
 		peer_set_protocol_names,
+		notification_services,
 	};
 
 	let overseer_handle =
diff --git a/cumulus/client/relay-chain-minimal-node/src/network.rs b/cumulus/client/relay-chain-minimal-node/src/network.rs
index 813dca47a0398..95785063c1aeb 100644
--- a/cumulus/client/relay-chain-minimal-node/src/network.rs
+++ b/cumulus/client/relay-chain-minimal-node/src/network.rs
@@ -26,10 +26,9 @@ use sc_network::{
 	NetworkService,
 };
 
-use sc_network::config::FullNetworkConfiguration;
+use sc_network::{config::FullNetworkConfiguration, NotificationService};
 use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
 use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle};
-use sc_utils::mpsc::tracing_unbounded;
 
 use std::{iter, sync::Arc};
 
@@ -45,7 +44,7 @@ pub(crate) fn build_collator_network(
 	Error,
 > {
 	let protocol_id = config.protocol_id();
-	let block_announce_config = get_block_announce_proto_config::(
+	let (block_announce_config, _notification_service) = get_block_announce_proto_config::(
 		protocol_id.clone(),
 		&None,
 		Roles::from(&config.role),
@@ -69,8 +68,6 @@ pub(crate) fn build_collator_network(
 	let peer_store_handle = peer_store.handle();
 	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
 
-	// RX is not used for anything because syncing is not started for the minimal node
-	let (tx, _rx) = tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
 	let network_params = sc_network::config::Params:: {
 		role: config.role.clone(),
 		executor: {
@@ -86,7 +83,6 @@ pub(crate) fn build_collator_network(
 		protocol_id,
 		metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
 		block_announce_config,
-		tx,
 	};
 
 	let network_worker = sc_network::NetworkWorker::new(network_params)?;
@@ -150,7 +146,7 @@ fn get_block_announce_proto_config(
 	best_number: NumberFor,
 	best_hash: B::Hash,
 	genesis_hash: B::Hash,
-) -> NonDefaultSetConfig {
+) -> (NonDefaultSetConfig, Box) {
 	let block_announces_protocol = {
 		let genesis_hash = genesis_hash.as_ref();
 		if let Some(ref fork_id) = fork_id {
@@ -160,12 +156,11 @@ fn get_block_announce_proto_config(
 		}
 	};
 
-	NonDefaultSetConfig {
-		notifications_protocol: block_announces_protocol.into(),
-		fallback_names: iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into())
-			.collect(),
-		max_notification_size: 1024 * 1024,
-		handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build(
+	NonDefaultSetConfig::new(
+		block_announces_protocol.into(),
+		iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
+		1024 * 1024,
+		Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build(
 			roles,
 			best_number,
 			best_hash,
@@ -173,11 +168,11 @@ fn get_block_announce_proto_config(
 		))),
 		// NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement
 		// protocol is still hardcoded into the peerset.
-		set_config: SetConfig {
+		SetConfig {
 			in_peers: 0,
 			out_peers: 0,
 			reserved_nodes: Vec::new(),
 			non_reserved_mode: NonReservedPeerMode::Deny,
 		},
-	}
+	)
 }
diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs
index 46d4a00faace6..ddce99d5c2a8a 100644
--- a/polkadot/node/network/bridge/src/lib.rs
+++ b/polkadot/node/network/bridge/src/lib.rs
@@ -83,6 +83,7 @@ pub(crate) enum WireMessage {
 	ViewUpdate(View),
 }
 
+#[derive(Debug)]
 pub(crate) struct PeerData {
 	/// The Latest view sent by the peer.
 	view: View,
diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs
index c264c94cc19bf..a9339a5c443c1 100644
--- a/polkadot/node/network/bridge/src/network.rs
+++ b/polkadot/node/network/bridge/src/network.rs
@@ -14,23 +14,24 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see .
 
-use std::{collections::HashSet, sync::Arc};
+use std::{
+	collections::{HashMap, HashSet},
+	sync::Arc,
+};
 
 use async_trait::async_trait;
-use futures::{prelude::*, stream::BoxStream};
+use parking_lot::Mutex;
 
 use parity_scale_codec::Encode;
 
 use sc_network::{
-	config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, Event as NetworkEvent,
-	IfDisconnected, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkRequest,
-	NetworkService, OutboundFailure, ReputationChange, RequestFailure,
+	config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, IfDisconnected, MessageSink,
+	NetworkPeers, NetworkRequest, NetworkService, OutboundFailure, ReputationChange,
+	RequestFailure,
 };
 
 use polkadot_node_network_protocol::{
-	peer_set::{
-		CollationVersion, PeerSet, PeerSetProtocolNames, ProtocolVersion, ValidationVersion,
-	},
+	peer_set::{CollationVersion, PeerSet, ProtocolVersion, ValidationVersion},
 	request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests},
 	v1 as protocol_v1, v2 as protocol_v2, vstaging as protocol_vstaging, PeerId,
 };
@@ -44,104 +45,94 @@ const LOG_TARGET: &'static str = "parachain::network-bridge-net";
 // Helper function to send a validation v1 message to a list of peers.
 // Messages are always sent via the main protocol, even legacy protocol messages.
 pub(crate) fn send_validation_message_v1(
-	net: &mut impl Network,
 	peers: Vec,
-	peerset_protocol_names: &PeerSetProtocolNames,
 	message: WireMessage,
 	metrics: &Metrics,
+	notification_sinks: &Arc>>>,
 ) {
 	gum::trace!(target: LOG_TARGET, ?peers, ?message, "Sending validation v1 message to peers",);
 
 	send_message(
-		net,
 		peers,
 		PeerSet::Validation,
 		ValidationVersion::V1.into(),
-		peerset_protocol_names,
 		message,
 		metrics,
+		notification_sinks,
 	);
 }
 
 // Helper function to send a validation vstaging message to a list of peers.
 // Messages are always sent via the main protocol, even legacy protocol messages.
 pub(crate) fn send_validation_message_vstaging(
-	net: &mut impl Network,
 	peers: Vec,
-	peerset_protocol_names: &PeerSetProtocolNames,
 	message: WireMessage,
 	metrics: &Metrics,
+	notification_sinks: &Arc>>>,
 ) {
 	gum::trace!(target: LOG_TARGET, ?peers, ?message, "Sending validation vstaging message to peers",);
 
 	send_message(
-		net,
 		peers,
 		PeerSet::Validation,
 		ValidationVersion::VStaging.into(),
-		peerset_protocol_names,
 		message,
 		metrics,
+		notification_sinks,
 	);
 }
 
 // Helper function to send a validation v2 message to a list of peers.
 // Messages are always sent via the main protocol, even legacy protocol messages.
 pub(crate) fn send_validation_message_v2(
-	net: &mut impl Network,
 	peers: Vec,
-	protocol_names: &PeerSetProtocolNames,
 	message: WireMessage,
 	metrics: &Metrics,
+	notification_sinks: &Arc>>>,
 ) {
 	send_message(
-		net,
 		peers,
 		PeerSet::Validation,
 		ValidationVersion::V2.into(),
-		protocol_names,
 		message,
 		metrics,
+		notification_sinks,
 	);
 }
 
 // Helper function to send a collation v1 message to a list of peers.
 // Messages are always sent via the main protocol, even legacy protocol messages.
 pub(crate) fn send_collation_message_v1(
-	net: &mut impl Network,
 	peers: Vec,
-	peerset_protocol_names: &PeerSetProtocolNames,
 	message: WireMessage,
 	metrics: &Metrics,
+	notification_sinks: &Arc>>>,
 ) {
 	send_message(
-		net,
 		peers,
 		PeerSet::Collation,
 		CollationVersion::V1.into(),
-		peerset_protocol_names,
 		message,
 		metrics,
+		notification_sinks,
 	);
 }
 
 // Helper function to send a collation v2 message to a list of peers.
 // Messages are always sent via the main protocol, even legacy protocol messages.
 pub(crate) fn send_collation_message_v2(
-	net: &mut impl Network,
 	peers: Vec,
-	peerset_protocol_names: &PeerSetProtocolNames,
 	message: WireMessage,
 	metrics: &Metrics,
+	notification_sinks: &Arc>>>,
 ) {
 	send_message(
-		net,
 		peers,
 		PeerSet::Collation,
 		CollationVersion::V2.into(),
-		peerset_protocol_names,
 		message,
 		metrics,
+		notification_sinks,
 	);
 }
 
@@ -151,19 +142,19 @@ pub(crate) fn send_collation_message_v2(
 /// messages that are compatible with the passed peer set, as that is currently not enforced by
 /// this function. These are messages of type `WireMessage` parameterized on the matching type.
 fn send_message(
-	net: &mut impl Network,
 	mut peers: Vec,
 	peer_set: PeerSet,
 	version: ProtocolVersion,
-	protocol_names: &PeerSetProtocolNames,
 	message: M,
 	metrics: &super::Metrics,
+	network_notification_sinks: &Arc>>>,
 ) where
 	M: Encode + Clone,
 {
 	if peers.is_empty() {
 		return
 	}
+
 	let message = {
 		let encoded = message.encode();
 		metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
@@ -171,13 +162,13 @@ fn send_message(
 		encoded
 	};
 
-	// optimization: generate the protocol name once.
-	let protocol_name = protocol_names.get_name(peer_set, version);
+	let notification_sinks = network_notification_sinks.lock();
+
 	gum::trace!(
 		target: LOG_TARGET,
 		?peers,
+		?peer_set,
 		?version,
-		?protocol_name,
 		?message,
 		"Sending message to peers",
 	);
@@ -185,29 +176,26 @@ fn send_message(
 	// optimization: avoid cloning the message for the last peer in the
 	// list. The message payload can be quite large. If the underlying
 	// network used `Bytes` this would not be necessary.
+	//
+	// peer may have gotten disconnect by the time `send_message()` is called
+	// at which point the the sink is not available.
 	let last_peer = peers.pop();
-
-	// We always send messages on the "main" name even when a negotiated
-	// fallback is used. The libp2p implementation handles the fallback
-	// under the hood.
-	let protocol_name = protocol_names.get_main_name(peer_set);
 	peers.into_iter().for_each(|peer| {
-		net.write_notification(peer, protocol_name.clone(), message.clone());
+		if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
+			sink.send_sync_notification(message.clone());
+		}
 	});
+
 	if let Some(peer) = last_peer {
-		net.write_notification(peer, protocol_name, message);
+		if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
+			sink.send_sync_notification(message.clone());
+		}
 	}
 }
 
 /// An abstraction over networking for the purposes of this subsystem.
 #[async_trait]
 pub trait Network: Clone + Send + 'static {
-	/// Get a stream of all events occurring on the network. This may include events unrelated
-	/// to the Polkadot protocol - the user of this function should filter only for events related
-	/// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
-	/// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
-	fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
-
 	/// Ask the network to keep a substream open with these nodes and not disconnect from them
 	/// until removed from the protocol's peer set.
 	/// Note that `out_peers` setting has no effect on this.
@@ -239,16 +227,12 @@ pub trait Network: Clone + Send + 'static {
 	/// Disconnect a given peer from the protocol specified without harming reputation.
 	fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName);
 
-	/// Write a notification to a peer on the given protocol.
-	fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec);
+	/// Get peer role.
+	fn peer_role(&self, who: PeerId, handshake: Vec) -> Option;
 }
 
 #[async_trait]
 impl Network for Arc> {
-	fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
-		NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
-	}
-
 	async fn set_reserved_peers(
 		&mut self,
 		protocol: ProtocolName,
@@ -273,10 +257,6 @@ impl Network for Arc> {
 		NetworkService::disconnect_peer(&**self, who, protocol);
 	}
 
-	fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec) {
-		NetworkService::write_notification(&**self, who, protocol, message);
-	}
-
 	async fn start_request(
 		&self,
 		authority_discovery: &mut AD,
@@ -348,6 +328,10 @@ impl Network for Arc> {
 			if_disconnected,
 		);
 	}
+
+	fn peer_role(&self, who: PeerId, handshake: Vec) -> Option {
+		NetworkService::peer_role(self, who, handshake)
+	}
 }
 
 /// We assume one `peer_id` per `authority_id`.
diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs
index 06be57ead0060..40cd167a968ba 100644
--- a/polkadot/node/network/bridge/src/rx/mod.rs
+++ b/polkadot/node/network/bridge/src/rx/mod.rs
@@ -20,11 +20,14 @@ use super::*;
 
 use always_assert::never;
 use bytes::Bytes;
-use futures::stream::{BoxStream, StreamExt};
 use net_protocol::filter_by_peer_version;
 use parity_scale_codec::{Decode, DecodeAll};
+use parking_lot::Mutex;
 
-use sc_network::Event as NetworkEvent;
+use sc_network::{
+	service::traits::{NotificationEvent, ValidationResult},
+	MessageSink, NotificationService,
+};
 use sp_consensus::SyncOracle;
 
 use polkadot_node_network_protocol::{
@@ -88,6 +91,9 @@ pub struct NetworkBridgeRx {
 	shared: Shared,
 	metrics: Metrics,
 	peerset_protocol_names: PeerSetProtocolNames,
+	validation_service: Box,
+	collation_service: Box,
+	notification_sinks: Arc>>>,
 }
 
 impl NetworkBridgeRx {
@@ -102,8 +108,18 @@ impl NetworkBridgeRx {
 		sync_oracle: Box,
 		metrics: Metrics,
 		peerset_protocol_names: PeerSetProtocolNames,
+		mut notification_services: HashMap>,
+		notification_sinks: Arc>>>,
 	) -> Self {
 		let shared = Shared::default();
+
+		let validation_service = notification_services
+			.remove(&PeerSet::Validation)
+			.expect("validation protocol was enabled so `NotificationService` must exist; qed");
+		let collation_service = notification_services
+			.remove(&PeerSet::Collation)
+			.expect("collation protocol was enabled so `NotificationService` must exist; qed");
+
 		Self {
 			network_service,
 			authority_discovery_service,
@@ -111,6 +127,9 @@ impl NetworkBridgeRx {
 			shared,
 			metrics,
 			peerset_protocol_names,
+			validation_service,
+			collation_service,
+			notification_sinks,
 		}
 	}
 }
@@ -121,444 +140,563 @@ where
 	Net: Network + Sync,
 	AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
 {
-	fn start(mut self, ctx: Context) -> SpawnedSubsystem {
-		// The stream of networking events has to be created at initialization, otherwise the
-		// networking might open connections before the stream of events has been grabbed.
-		let network_stream = self.network_service.event_stream();
-
+	fn start(self, ctx: Context) -> SpawnedSubsystem {
 		// Swallow error because failure is fatal to the node and we log with more precision
 		// within `run_network`.
-		let future = run_network_in(self, ctx, network_stream)
+		let future = run_network_in(self, ctx)
 			.map_err(|e| SubsystemError::with_origin("network-bridge", e))
 			.boxed();
 		SpawnedSubsystem { name: "network-bridge-rx-subsystem", future }
 	}
 }
 
-async fn handle_network_messages(
-	mut sender: impl overseer::NetworkBridgeRxSenderTrait,
-	mut network_service: impl Network,
-	network_stream: BoxStream<'static, NetworkEvent>,
-	mut authority_discovery_service: AD,
-	metrics: Metrics,
-	shared: Shared,
-	peerset_protocol_names: PeerSetProtocolNames,
-) -> Result<(), Error>
-where
+/// Handle notification event received over the validation protocol.
+async fn handle_validation_message(
+	event: NotificationEvent,
+	network_service: &mut impl Network,
+	sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
+	authority_discovery_service: &mut AD,
+	metrics: &Metrics,
+	shared: &Shared,
+	peerset_protocol_names: &PeerSetProtocolNames,
+	notification_service: &mut Box,
+	notification_sinks: &mut Arc>>>,
+) where
 	AD: validator_discovery::AuthorityDiscovery + Send,
 {
-	let mut network_stream = network_stream.fuse();
-	loop {
-		match network_stream.next().await {
-			None => return Err(Error::EventStreamConcluded),
-			Some(NetworkEvent::Dht(_)) => {},
-			Some(NetworkEvent::NotificationStreamOpened {
-				remote: peer,
-				protocol,
-				role,
-				negotiated_fallback,
-				received_handshake: _,
-			}) => {
-				let role = ObservedRole::from(role);
-				let (peer_set, version) = {
-					let (peer_set, version) =
-						match peerset_protocol_names.try_get_protocol(&protocol) {
-							None => continue,
-							Some(p) => p,
-						};
-
-					if let Some(fallback) = negotiated_fallback {
-						match peerset_protocol_names.try_get_protocol(&fallback) {
-							None => {
+	match event {
+		NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
+			// only accept peers whose role can be determined
+			let result = network_service
+				.peer_role(peer, handshake)
+				.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
+			let _ = result_tx.send(result);
+		},
+		NotificationEvent::NotificationStreamOpened {
+			peer,
+			handshake,
+			negotiated_fallback,
+			..
+		} => {
+			let role = match network_service.peer_role(peer, handshake) {
+				Some(role) => ObservedRole::from(role),
+				None => {
+					gum::debug!(
+						target: LOG_TARGET,
+						?peer,
+						"Failed to determine peer role for validation protocol",
+					);
+					return
+				},
+			};
+
+			let (peer_set, version) = {
+				let (peer_set, version) =
+					(PeerSet::Validation, PeerSet::Validation.get_main_version());
+
+				if let Some(fallback) = negotiated_fallback {
+					match peerset_protocol_names.try_get_protocol(&fallback) {
+						None => {
+							gum::debug!(
+								target: LOG_TARGET,
+								fallback = &*fallback,
+								?peer,
+								peerset = ?peer_set,
+								"Unknown fallback",
+							);
+
+							return
+						},
+						Some((p2, v2)) => {
+							if p2 != peer_set {
 								gum::debug!(
 									target: LOG_TARGET,
 									fallback = &*fallback,
-									?peer,
-									?peer_set,
-									"Unknown fallback",
+									fallback_peerset = ?p2,
+									peerset = ?peer_set,
+									"Fallback mismatched peer-set",
 								);
 
-								continue
-							},
-							Some((p2, v2)) => {
-								if p2 != peer_set {
-									gum::debug!(
-										target: LOG_TARGET,
-										fallback = &*fallback,
-										fallback_peerset = ?p2,
-										protocol = &*protocol,
-										peerset = ?peer_set,
-										"Fallback mismatched peer-set",
-									);
-
-									continue
-								}
-
-								(p2, v2)
-							},
-						}
-					} else {
-						(peer_set, version)
-					}
-				};
-
-				gum::debug!(
-					target: LOG_TARGET,
-					action = "PeerConnected",
-					peer_set = ?peer_set,
-					version = %version,
-					peer = ?peer,
-					role = ?role
-				);
-
-				let local_view = {
-					let mut shared = shared.0.lock();
-					let peer_map = match peer_set {
-						PeerSet::Validation => &mut shared.validation_peers,
-						PeerSet::Collation => &mut shared.collation_peers,
-					};
+								return
+							}
 
-					match peer_map.entry(peer) {
-						hash_map::Entry::Occupied(_) => continue,
-						hash_map::Entry::Vacant(vacant) => {
-							vacant.insert(PeerData { view: View::default(), version });
+							(p2, v2)
 						},
 					}
+				} else {
+					(peer_set, version)
+				}
+			};
+			// store the notification sink to `notification_sinks` so both `NetworkBridgeRx`
+			// and `NetworkBridgeTx` can send messages to the peer.
+			match notification_service.message_sink(&peer) {
+				Some(sink) => {
+					notification_sinks.lock().insert((peer_set, peer), sink);
+				},
+				None => {
+					gum::warn!(
+						target: LOG_TARGET,
+						peerset = ?peer_set,
+						version = %version,
+						?peer,
+						?role,
+						"Message sink not available for peer",
+					);
+					return
+				},
+			}
 
-					metrics.on_peer_connected(peer_set, version);
-					metrics.note_peer_count(peer_set, version, peer_map.len());
-
-					shared.local_view.clone().unwrap_or(View::default())
-				};
-
-				let maybe_authority =
-					authority_discovery_service.get_authority_ids_by_peer_id(peer).await;
-
-				match peer_set {
-					PeerSet::Validation => {
-						dispatch_validation_events_to_all(
-							vec![
-								NetworkBridgeEvent::PeerConnected(
-									peer,
-									role,
-									version,
-									maybe_authority,
-								),
-								NetworkBridgeEvent::PeerViewChange(peer, View::default()),
-							],
-							&mut sender,
-							&metrics,
-						)
-						.await;
-
-						match ValidationVersion::try_from(version)
-							.expect("try_get_protocol has already checked version is known; qed")
-						{
-							ValidationVersion::V1 => send_validation_message_v1(
-								&mut network_service,
-								vec![peer],
-								&peerset_protocol_names,
-								WireMessage::::ViewUpdate(
-									local_view,
-								),
-								&metrics,
-							),
-							ValidationVersion::VStaging => send_validation_message_vstaging(
-								&mut network_service,
-								vec![peer],
-								&peerset_protocol_names,
-								WireMessage::::ViewUpdate(
-									local_view,
-								),
-								&metrics,
-							),
-							ValidationVersion::V2 => send_validation_message_v2(
-								&mut network_service,
-								vec![peer],
-								&peerset_protocol_names,
-								WireMessage::::ViewUpdate(
-									local_view,
-								),
-								&metrics,
-							),
-						}
-					},
-					PeerSet::Collation => {
-						dispatch_collation_events_to_all(
-							vec![
-								NetworkBridgeEvent::PeerConnected(
-									peer,
-									role,
-									version,
-									maybe_authority,
-								),
-								NetworkBridgeEvent::PeerViewChange(peer, View::default()),
-							],
-							&mut sender,
-						)
-						.await;
-
-						match CollationVersion::try_from(version)
-							.expect("try_get_protocol has already checked version is known; qed")
-						{
-							CollationVersion::V1 => send_collation_message_v1(
-								&mut network_service,
-								vec![peer],
-								&peerset_protocol_names,
-								WireMessage::::ViewUpdate(
-									local_view,
-								),
-								&metrics,
-							),
-							CollationVersion::V2 => send_collation_message_v2(
-								&mut network_service,
-								vec![peer],
-								&peerset_protocol_names,
-								WireMessage::::ViewUpdate(
-									local_view,
-								),
-								&metrics,
-							),
-						}
+			gum::debug!(
+				target: LOG_TARGET,
+				action = "PeerConnected",
+				peer_set = ?peer_set,
+				version = %version,
+				peer = ?peer,
+				role = ?role
+			);
+
+			let local_view = {
+				let mut shared = shared.0.lock();
+				let peer_map = &mut shared.validation_peers;
+
+				match peer_map.entry(peer) {
+					hash_map::Entry::Occupied(_) => return,
+					hash_map::Entry::Vacant(vacant) => {
+						vacant.insert(PeerData { view: View::default(), version });
 					},
 				}
-			},
-			Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => {
-				let (peer_set, version) = match peerset_protocol_names.try_get_protocol(&protocol) {
-					None => continue,
-					Some(peer_set) => peer_set,
-				};
 
-				gum::debug!(
-					target: LOG_TARGET,
-					action = "PeerDisconnected",
-					peer_set = ?peer_set,
-					peer = ?peer
-				);
+				metrics.on_peer_connected(peer_set, version);
+				metrics.note_peer_count(peer_set, version, peer_map.len());
 
-				let was_connected = {
-					let mut shared = shared.0.lock();
-					let peer_map = match peer_set {
-						PeerSet::Validation => &mut shared.validation_peers,
-						PeerSet::Collation => &mut shared.collation_peers,
-					};
+				shared.local_view.clone().unwrap_or(View::default())
+			};
 
-					let w = peer_map.remove(&peer).is_some();
+			let maybe_authority =
+				authority_discovery_service.get_authority_ids_by_peer_id(peer).await;
 
-					metrics.on_peer_disconnected(peer_set, version);
-					metrics.note_peer_count(peer_set, version, peer_map.len());
+			dispatch_validation_events_to_all(
+				vec![
+					NetworkBridgeEvent::PeerConnected(peer, role, version, maybe_authority),
+					NetworkBridgeEvent::PeerViewChange(peer, View::default()),
+				],
+				sender,
+				&metrics,
+			)
+			.await;
 
-					w
-				};
+			match ValidationVersion::try_from(version)
+				.expect("try_get_protocol has already checked version is known; qed")
+			{
+				ValidationVersion::V1 => send_validation_message_v1(
+					vec![peer],
+					WireMessage::::ViewUpdate(local_view),
+					metrics,
+					notification_sinks,
+				),
+				ValidationVersion::VStaging => send_validation_message_vstaging(
+					vec![peer],
+					WireMessage::::ViewUpdate(local_view),
+					metrics,
+					notification_sinks,
+				),
+				ValidationVersion::V2 => send_validation_message_v2(
+					vec![peer],
+					WireMessage::::ViewUpdate(local_view),
+					metrics,
+					notification_sinks,
+				),
+			}
+		},
+		NotificationEvent::NotificationStreamClosed { peer } => {
+			let (peer_set, version) = (PeerSet::Validation, PeerSet::Validation.get_main_version());
 
-				if was_connected && version == peer_set.get_main_version() {
-					match peer_set {
-						PeerSet::Validation =>
-							dispatch_validation_event_to_all(
-								NetworkBridgeEvent::PeerDisconnected(peer),
-								&mut sender,
-								&metrics,
-							)
-							.await,
-						PeerSet::Collation =>
-							dispatch_collation_event_to_all(
-								NetworkBridgeEvent::PeerDisconnected(peer),
-								&mut sender,
-							)
-							.await,
-					}
+			gum::debug!(
+				target: LOG_TARGET,
+				action = "PeerDisconnected",
+				?peer_set,
+				?peer
+			);
+
+			let was_connected = {
+				let mut shared = shared.0.lock();
+				let peer_map = &mut shared.validation_peers;
+
+				let w = peer_map.remove(&peer).is_some();
+
+				metrics.on_peer_disconnected(peer_set, version);
+				metrics.note_peer_count(peer_set, version, peer_map.len());
+
+				w
+			};
+
+			notification_sinks.lock().remove(&(peer_set, peer));
+
+			if was_connected && version == peer_set.get_main_version() {
+				dispatch_validation_event_to_all(
+					NetworkBridgeEvent::PeerDisconnected(peer),
+					sender,
+					&metrics,
+				)
+				.await;
+			}
+		},
+		NotificationEvent::NotificationReceived { peer, notification } => {
+			let expected_versions = {
+				let mut versions = PerPeerSet::