From 0edd287217ec95930743f17ed83096ef7cbffb73 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Wed, 23 Apr 2025 15:22:23 +0200 Subject: [PATCH] Use the new internal service for network connections This is achieved by replacing the NetworkConnectionInfoServiceComm component with the Output component. TODO: - Receiving messages from sensor is not fully implemented, but the mechanisms are already in place for it. - NetworkStatusNotifierTest is broken. --- .../workflows/integration-test-containers.yml | 1 + collector/lib/Channel.h | 123 +++++++ collector/lib/CollectorService.cpp | 19 +- collector/lib/CollectorService.h | 3 - .../lib/NetworkConnectionInfoServiceComm.cpp | 62 ---- .../lib/NetworkConnectionInfoServiceComm.h | 63 ---- collector/lib/NetworkStatusNotifier.cpp | 49 +-- collector/lib/NetworkStatusNotifier.h | 59 ++-- collector/lib/SignalServiceClient.cpp | 53 ---- collector/lib/SignalServiceClient.h | 73 ----- collector/lib/output/IClient.h | 24 +- collector/lib/output/Output.cpp | 130 ++------ collector/lib/output/Output.h | 77 +++-- collector/lib/output/SensorClient.h | 85 ----- collector/lib/output/grpc/Client.cpp | 81 +++-- collector/lib/output/grpc/Client.h | 56 +++- collector/lib/output/grpc/IGrpcClient.h | 28 ++ .../output/grpc/NetworkConnectionClient.cpp | 85 +++++ .../lib/output/grpc/NetworkConnectionClient.h | 63 ++++ collector/lib/output/grpc/ProcessClient.h | 69 ++++ .../lib/output/grpc/SignalServiceClient.h | 70 ++++ collector/lib/output/log/Client.h | 11 +- .../{ => system-inspector}/ContainerEngine.h | 1 + collector/test/ChannelTest.cpp | 139 ++++++++ collector/test/NetworkStatusNotifierTest.cpp | 300 +++++++----------- collector/test/OutputTest.cpp | 28 +- 26 files changed, 958 insertions(+), 794 deletions(-) create mode 100644 collector/lib/Channel.h delete mode 100644 collector/lib/NetworkConnectionInfoServiceComm.cpp delete mode 100644 collector/lib/NetworkConnectionInfoServiceComm.h delete mode 100644 collector/lib/SignalServiceClient.cpp delete mode 100644 collector/lib/SignalServiceClient.h delete mode 100644 collector/lib/output/SensorClient.h create mode 100644 collector/lib/output/grpc/IGrpcClient.h create mode 100644 collector/lib/output/grpc/NetworkConnectionClient.cpp create mode 100644 collector/lib/output/grpc/NetworkConnectionClient.h create mode 100644 collector/lib/output/grpc/ProcessClient.h create mode 100644 collector/lib/output/grpc/SignalServiceClient.h rename collector/lib/{ => system-inspector}/ContainerEngine.h (97%) create mode 100644 collector/test/ChannelTest.cpp diff --git a/.github/workflows/integration-test-containers.yml b/.github/workflows/integration-test-containers.yml index 1c2c3306c3..0545801a50 100644 --- a/.github/workflows/integration-test-containers.yml +++ b/.github/workflows/integration-test-containers.yml @@ -48,6 +48,7 @@ jobs: - integration-tests/images.yml - integration-tests/Dockerfile - .github/workflows/integration-test-containers.yml + - collector/proto/third_party/stackrox build-test-image: name: Build the integration test image diff --git a/collector/lib/Channel.h b/collector/lib/Channel.h new file mode 100644 index 0000000000..1aed9b2703 --- /dev/null +++ b/collector/lib/Channel.h @@ -0,0 +1,123 @@ +#ifndef COLLECTOR_CHANNEL_H +#define COLLECTOR_CHANNEL_H + +#include +#include +#include +#include +#include +#include + +namespace collector { +template +class Channel { + public: + using value_type = T; + + Channel(std::size_t capacity = 0) + : capacity_(capacity) { + } + + bool IsClosed() { + return closed_.load(); + } + + void Close() { + closed_.store(true); + cv_.notify_all(); + } + + friend Channel& operator<<(Channel& ch, const T& data) { + if (ch.IsClosed()) { + // If the channel is closed we simply drop messages + return ch; + } + + std::unique_lock lock{ch.mutex_}; + if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) { + ch.cv_.wait(lock, [&ch] { return ch.queue_.size() < ch.capacity_; }); + } + + ch.queue_.push(data); + ch.cv_.notify_one(); + + return ch; + } + + friend Channel& operator<<(Channel& ch, T&& data) { + if (ch.IsClosed()) { + // If the channel is closed we simply drop messages + return ch; + } + + std::unique_lock lock{ch.mutex_}; + if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) { + ch.cv_.wait(lock, [&ch] { return ch.queue_.size() < ch.capacity_; }); + } + + ch.queue_.push(std::move(data)); + ch.cv_.notify_one(); + + return ch; + } + + friend Channel& operator>>(Channel& ch, T& out) { + std::unique_lock lock{ch.mutex_}; + if (ch.IsClosed() && ch.queue_.empty()) { + return ch; + } + + ch.cv_.wait(lock, [&ch] { return !ch.queue_.empty() || ch.IsClosed(); }); + if (!ch.queue_.empty()) { + out = std::move(ch.queue_.front()); + ch.queue_.pop(); + } + + ch.cv_.notify_one(); + return ch; + } + + bool Empty() { + std::unique_lock lock{mutex_}; + return queue_.empty(); + } + + struct Iterator { + Iterator(Channel& ch) : ch_(ch) {} + + using iterator_category = std::input_iterator_tag; + using value_type = T; + using reference = T&; + using pointer = T*; + + Iterator operator++() { return *this; } + reference operator*() { + ch_ >> value_; + return value_; + } + + bool operator!=(Iterator& /*unused*/) const { + std::unique_lock lock{ch_.mutex_}; + ch_.cv_.wait(lock, [this] { return !ch_.queue_.empty() || ch_.IsClosed(); }); + + return !(ch_.IsClosed() && ch_.queue_.empty()); + } + + private: + Channel& ch_; + value_type value_; + }; + + Iterator begin() { return Iterator{*this}; } + Iterator end() { return Iterator{*this}; } + + private: + std::size_t capacity_; + std::queue queue_; + std::mutex mutex_; + std::condition_variable cv_; + std::atomic closed_{false}; +}; +} // namespace collector + +#endif diff --git a/collector/lib/CollectorService.cpp b/collector/lib/CollectorService.cpp index e24231782b..1bfb97fdc0 100644 --- a/collector/lib/CollectorService.cpp +++ b/collector/lib/CollectorService.cpp @@ -40,8 +40,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic(); UnorderedSet ignored_l4proto_port_pairs(config_.IgnoredL4ProtoPortPairs()); conn_tracker_->UpdateIgnoredL4ProtoPortPairs(std::move(ignored_l4proto_port_pairs)); @@ -52,6 +51,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic(system_inspector_.GetInspector(), conn_tracker_, system_inspector_.GetUserspaceStats()); @@ -98,6 +98,12 @@ void CollectorService::RunForever() { CLOG(INFO) << "Network scrape interval set to " << config_.ScrapeInterval() << " seconds"; + auto should_stop = [this] { + return control_->load(std::memory_order_relaxed) == STOP_COLLECTOR; + }; + + output_.WaitReady(should_stop); + if (net_status_notifier_) { net_status_notifier_->Start(); } @@ -108,8 +114,7 @@ void CollectorService::RunForever() { system_inspector_.Start(); - ControlValue cv; - while ((cv = control_->load(std::memory_order_relaxed)) != STOP_COLLECTOR) { + while (!should_stop()) { system_inspector_.Run(*control_); CLOG(DEBUG) << "Interrupted collector!"; } @@ -123,12 +128,6 @@ void CollectorService::RunForever() { CLOG(INFO) << "Shutting down collector."; } -bool CollectorService::WaitForGRPCServer() { - std::string error_str; - auto interrupt = [this] { return control_->load(std::memory_order_relaxed) == STOP_COLLECTOR; }; - return WaitForChannelReady(config_.grpc_channel, interrupt); -} - bool CollectorService::InitKernel() { auto& startup_diagnostics = StartupDiagnostics::GetInstance(); std::string cm_name(CollectionMethodName(config_.GetCollectionMethod())); diff --git a/collector/lib/CollectorService.h b/collector/lib/CollectorService.h index 2612a77e7b..61ac802678 100644 --- a/collector/lib/CollectorService.h +++ b/collector/lib/CollectorService.h @@ -30,8 +30,6 @@ class CollectorService { bool InitKernel(); private: - bool WaitForGRPCServer(); - CollectorConfig& config_; output::Output output_; system_inspector::Service system_inspector_; @@ -53,7 +51,6 @@ class CollectorService { std::shared_ptr conn_tracker_; std::unique_ptr net_status_notifier_; std::shared_ptr process_store_; - std::shared_ptr network_connection_info_service_comm_; }; } // namespace collector diff --git a/collector/lib/NetworkConnectionInfoServiceComm.cpp b/collector/lib/NetworkConnectionInfoServiceComm.cpp deleted file mode 100644 index a34447d771..0000000000 --- a/collector/lib/NetworkConnectionInfoServiceComm.cpp +++ /dev/null @@ -1,62 +0,0 @@ -#include "NetworkConnectionInfoServiceComm.h" - -#include "GRPCUtil.h" -#include "HostInfo.h" -#include "Utility.h" - -namespace collector { - -constexpr char NetworkConnectionInfoServiceComm::kHostnameMetadataKey[]; -constexpr char NetworkConnectionInfoServiceComm::kCapsMetadataKey[]; -constexpr char NetworkConnectionInfoServiceComm::kSupportedCaps[]; - -std::unique_ptr NetworkConnectionInfoServiceComm::CreateClientContext() const { - auto ctx = std::make_unique(); - ctx->AddMetadata(kHostnameMetadataKey, HostInfo::GetHostname()); - ctx->AddMetadata(kCapsMetadataKey, kSupportedCaps); - return ctx; -} - -NetworkConnectionInfoServiceComm::NetworkConnectionInfoServiceComm(std::shared_ptr channel) : channel_(std::move(channel)) { - if (channel_) { - stub_ = sensor::NetworkConnectionInfoService::NewStub(channel_); - } -} - -void NetworkConnectionInfoServiceComm::ResetClientContext() { - WITH_LOCK(context_mutex_) { - context_ = CreateClientContext(); - } -} - -bool NetworkConnectionInfoServiceComm::WaitForConnectionReady(const std::function& check_interrupted) { - if (!channel_) { - return true; - } - - return WaitForChannelReady(channel_, check_interrupted); -} - -void NetworkConnectionInfoServiceComm::TryCancel() { - WITH_LOCK(context_mutex_) { - if (context_) { - context_->TryCancel(); - } - } -} - -std::unique_ptr> NetworkConnectionInfoServiceComm::PushNetworkConnectionInfoOpenStream(std::function receive_func) { - if (!context_) { - ResetClientContext(); - } - - if (channel_) { - return DuplexClient::CreateWithReadCallback( - &sensor::NetworkConnectionInfoService::Stub::AsyncPushNetworkConnectionInfo, - channel_, context_.get(), std::move(receive_func)); - } else { - return std::make_unique>(); - } -} - -} // namespace collector diff --git a/collector/lib/NetworkConnectionInfoServiceComm.h b/collector/lib/NetworkConnectionInfoServiceComm.h deleted file mode 100644 index 0d2be286cf..0000000000 --- a/collector/lib/NetworkConnectionInfoServiceComm.h +++ /dev/null @@ -1,63 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include -#include - -#include "internalapi/sensor/network_connection_iservice.grpc.pb.h" - -#include "DuplexGRPC.h" - -namespace collector { - -// Gathers all the communication routines targeted at NetworkConnectionInfoService. -// A simple gRPC mock is not sufficient for testing, since it doesn't abstract Streams. -class INetworkConnectionInfoServiceComm { - public: - virtual ~INetworkConnectionInfoServiceComm() {} - - virtual void ResetClientContext() = 0; - // return false on failure - virtual bool WaitForConnectionReady(const std::function& check_interrupted) = 0; - virtual void TryCancel() = 0; - - virtual sensor::NetworkConnectionInfoService::StubInterface* GetStub() = 0; - - virtual std::unique_ptr> PushNetworkConnectionInfoOpenStream(std::function receive_func) = 0; -}; - -class NetworkConnectionInfoServiceComm : public INetworkConnectionInfoServiceComm { - public: - NetworkConnectionInfoServiceComm(std::shared_ptr channel); - - void ResetClientContext() override; - bool WaitForConnectionReady(const std::function& check_interrupted) override; - void TryCancel() override; - - sensor::NetworkConnectionInfoService::StubInterface* GetStub() override { - return stub_.get(); - } - - std::unique_ptr> PushNetworkConnectionInfoOpenStream(std::function receive_func) override; - - private: - static constexpr char kHostnameMetadataKey[] = "rox-collector-hostname"; - static constexpr char kCapsMetadataKey[] = "rox-collector-capabilities"; - - // Keep this updated with all capabilities supported. Format it as a comma-separated list with NO spaces. - static constexpr char kSupportedCaps[] = "public-ips,network-graph-external-srcs"; - - std::unique_ptr CreateClientContext() const; - - std::shared_ptr channel_; - std::unique_ptr stub_; - - std::mutex context_mutex_; - std::unique_ptr context_; -}; - -} // namespace collector diff --git a/collector/lib/NetworkStatusNotifier.cpp b/collector/lib/NetworkStatusNotifier.cpp index 493c2668c2..749b73daef 100644 --- a/collector/lib/NetworkStatusNotifier.cpp +++ b/collector/lib/NetworkStatusNotifier.cpp @@ -57,15 +57,12 @@ std::vector readNetworks(const std::string& networks, Address::Family fam return ip_nets; } -void NetworkStatusNotifier::OnRecvControlMessage(const sensor::NetworkFlowsControlMessage* msg) { - if (!msg) { - return; - } - if (msg->has_ip_networks()) { - ReceivePublicIPs(msg->public_ip_addresses()); +void NetworkStatusNotifier::OnRecvControlMessage(const sensor::NetworkFlowsControlMessage& msg) { + if (msg.has_public_ip_addresses()) { + ReceivePublicIPs(msg.public_ip_addresses()); } - if (msg->has_ip_networks()) { - ReceiveIPNetworks(msg->ip_networks()); + if (msg.has_ip_networks()) { + ReceiveIPNetworks(msg.ip_networks()); } } @@ -111,28 +108,12 @@ void NetworkStatusNotifier::ReceiveIPNetworks(const sensor::IPNetworkList& netwo void NetworkStatusNotifier::Run() { Profiler::RegisterCPUThread(); - auto next_attempt = std::chrono::system_clock::now(); - - while (thread_.PauseUntil(next_attempt)) { - comm_->ResetClientContext(); - - if (!comm_->WaitForConnectionReady([this] { return thread_.should_stop(); })) { - break; - } - - auto client_writer = comm_->PushNetworkConnectionInfoOpenStream([this](const sensor::NetworkFlowsControlMessage* msg) { OnRecvControlMessage(msg); }); - RunSingle(client_writer.get()); + while (output_->WaitReady([this] { return thread_.should_stop(); })) { + RunSingle(); if (thread_.should_stop()) { return; } - auto status = client_writer->Finish(std::chrono::seconds(5)); - if (status.ok()) { - CLOG(ERROR) << "Error streaming network connection info: server hung up unexpectedly"; - } else { - CLOG(ERROR) << "Error streaming network connection info: " << status.error_message(); - } - next_attempt = std::chrono::system_clock::now() + std::chrono::seconds(10); } CLOG(INFO) << "Stopped network status notifier."; @@ -140,11 +121,12 @@ void NetworkStatusNotifier::Run() { void NetworkStatusNotifier::Start() { thread_.Start([this] { Run(); }); + receiver_.Start(); CLOG(INFO) << "Started network status notifier."; } void NetworkStatusNotifier::Stop() { - comm_->TryCancel(); + receiver_.Stop(); thread_.Stop(); } @@ -217,15 +199,18 @@ bool NetworkStatusNotifier::UpdateAllConnsAndEndpoints() { return true; } -void NetworkStatusNotifier::RunSingle(IDuplexClientWriter* writer) { - WaitUntilWriterStarted(writer, 10); - +void NetworkStatusNotifier::RunSingle() { ConnMap old_conn_state; AdvertisedEndpointMap old_cep_state; auto next_scrape = std::chrono::system_clock::now(); int64_t time_at_last_scrape = NowMicros(); - while (writer->Sleep(next_scrape)) { + while (thread_.PauseUntil(next_scrape)) { + // Check output is ready before attempting to send a network update + if (!output_->IsReady()) { + return; + } + CLOG(DEBUG) << "Starting network status notification"; next_scrape = std::chrono::system_clock::now() + std::chrono::seconds(config_.ScrapeInterval()); @@ -272,7 +257,7 @@ void NetworkStatusNotifier::RunSingle(IDuplexClientWriterWrite(*msg, next_scrape)) { + if (output_->SendMsg(*msg) != SignalHandler::PROCESSED) { CLOG(ERROR) << "Failed to write network connection info"; return; } diff --git a/collector/lib/NetworkStatusNotifier.h b/collector/lib/NetworkStatusNotifier.h index e61b57ba6d..8e30b14be8 100644 --- a/collector/lib/NetworkStatusNotifier.h +++ b/collector/lib/NetworkStatusNotifier.h @@ -8,10 +8,11 @@ #include "CollectorConfig.h" #include "CollectorConnectionStats.h" #include "ConnTracker.h" -#include "NetworkConnectionInfoServiceComm.h" +#include "DuplexGRPC.h" #include "ProcfsScraper.h" #include "ProtoAllocator.h" #include "StoppableThread.h" +#include "output/Output.h" namespace collector { @@ -20,11 +21,13 @@ class NetworkStatusNotifier : protected ProtoAllocator conn_tracker, const CollectorConfig& config, system_inspector::Service* inspector, + output::Output* output, prometheus::Registry* registry) : conn_scraper_(std::make_unique(config, inspector)), conn_tracker_(std::move(conn_tracker)), config_(config), - comm_(std::make_unique(config.grpc_channel)) { + output_(output), + receiver_(output_->GetNetworkControlChannel(), [this](const auto& msg) { OnRecvControlMessage(msg); }) { if (config_.EnableConnectionStats()) { connections_total_reporter_ = {{registry, "rox_connections_total", @@ -55,36 +58,25 @@ class NetworkStatusNotifier : protected ProtoAllocator&& comm) { - comm_ = std::move(comm); - } - private: + FRIEND_TEST(NetworkStatusNotifierTest, SimpleStartStop); FRIEND_TEST(NetworkStatusNotifierTest, RateLimitedConnections); sensor::NetworkConnectionInfoMessage* CreateInfoMessage(const ConnMap& conn_delta, const AdvertisedEndpointMap& cep_delta); void AddConnections(::google::protobuf::RepeatedPtrField* updates, const ConnMap& delta); void AddContainerEndpoints(::google::protobuf::RepeatedPtrField* updates, const AdvertisedEndpointMap& delta); - sensor::NetworkConnection* ConnToProto(const Connection& conn); + sensor::NetworkConnection* ConnToProto(const Connection& conn_proto); sensor::NetworkEndpoint* ContainerEndpointToProto(const ContainerEndpoint& cep); sensor::NetworkAddress* EndpointToProto(const Endpoint& endpoint); storage::NetworkProcessUniqueKey* ProcessToProto(const collector::IProcess& process); - void OnRecvControlMessage(const sensor::NetworkFlowsControlMessage* msg); + void OnRecvControlMessage(const sensor::NetworkFlowsControlMessage& msg); void Run(); void WaitUntilWriterStarted(IDuplexClientWriter* writer, int wait_time); bool UpdateAllConnsAndEndpoints(); - void RunSingle(IDuplexClientWriter* writer); + void RunSingle(); void ReceivePublicIPs(const sensor::IPAddressList& public_ips); void ReceiveIPNetworks(const sensor::IPNetworkList& networks); @@ -96,7 +88,38 @@ class NetworkStatusNotifier : protected ProtoAllocator conn_tracker_; const CollectorConfig& config_; - std::unique_ptr comm_; + output::Output* output_; + + class Receiver { + public: + using callback_t = std::function; + Receiver(Channel& channel, callback_t callback) + : channel_(channel), callback_(std::move(callback)) {} + + void Start() { + thread_.Start([this] { + sensor::NetworkFlowsControlMessage ctrl; + while (!thread_.should_stop()) { + for (const auto& ctrl : channel_) { + callback_(ctrl); + } + } + }); + } + + void Stop() { + channel_.Close(); + thread_.Stop(); + } + + private: + StoppableThread thread_; + Channel& channel_; + std::mutex mutex_; + std::condition_variable cv_; + + std::function callback_; + } receiver_; std::optional> connections_total_reporter_; std::optional> connections_rate_reporter_; diff --git a/collector/lib/SignalServiceClient.cpp b/collector/lib/SignalServiceClient.cpp deleted file mode 100644 index 6ae2abd8c2..0000000000 --- a/collector/lib/SignalServiceClient.cpp +++ /dev/null @@ -1,53 +0,0 @@ -#include "SignalServiceClient.h" - -#include "Logging.h" -#include "Utility.h" - -namespace collector { - -bool SignalServiceClient::Recreate() { - context_ = std::make_unique(); - writer_ = DuplexClient::CreateWithReadsIgnored(&SignalService::Stub::AsyncPushSignals, channel_, context_.get()); - if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) { - CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ..."; - CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message(); - writer_.reset(); - return false; - } - - first_write_ = true; - stream_active_.store(true, std::memory_order_release); - return true; -} - -SignalHandler::Result SignalServiceClient::PushSignals(const SignalStreamMessage& msg) { - if (!stream_active_.load(std::memory_order_acquire)) { - CLOG_THROTTLED(ERROR, std::chrono::seconds(10)) - << "GRPC stream is not established"; - return SignalHandler::ERROR; - } - - if (first_write_) { - first_write_ = false; - return SignalHandler::NEEDS_REFRESH; - } - - if (!writer_->Write(msg)) { - auto status = writer_->FinishNow(); - if (!status.ok()) { - CLOG(ERROR) << "GRPC writes failed: " << status.error_message(); - } - writer_.reset(); - stream_active_.store(false, std::memory_order_release); - return SignalHandler::ERROR; - } - - return SignalHandler::PROCESSED; -} - -SignalHandler::Result StdoutSignalServiceClient::PushSignals(const SignalStreamMessage& msg) { - LogProtobufMessage(msg); - return SignalHandler::PROCESSED; -} - -} // namespace collector diff --git a/collector/lib/SignalServiceClient.h b/collector/lib/SignalServiceClient.h deleted file mode 100644 index 07b41e2726..0000000000 --- a/collector/lib/SignalServiceClient.h +++ /dev/null @@ -1,73 +0,0 @@ -#pragma once - -// SIGNAL_SERVICE_CLIENT.h -// This class defines our GRPC client abstraction - -#include -#include -#include - -#include "internalapi/sensor/signal_iservice.grpc.pb.h" - -#include "DuplexGRPC.h" -#include "SignalHandler.h" - -namespace collector { - -class ISignalServiceClient { - public: - using SignalStreamMessage = sensor::SignalStreamMessage; - - virtual bool Recreate() = 0; - virtual SignalHandler::Result PushSignals(const SignalStreamMessage& msg) = 0; - - virtual ~ISignalServiceClient() = default; -}; - -class SignalServiceClient : public ISignalServiceClient { - public: - using SignalService = sensor::SignalService; - using SignalStreamMessage = sensor::SignalStreamMessage; - - SignalServiceClient(const SignalServiceClient&) = delete; - SignalServiceClient(SignalServiceClient&&) = delete; - SignalServiceClient& operator=(const SignalServiceClient&) = delete; - SignalServiceClient& operator=(SignalServiceClient&&) = delete; - ~SignalServiceClient() override { - context_->TryCancel(); - } - - explicit SignalServiceClient(std::shared_ptr channel) - : channel_(std::move(channel)), stream_active_(false) {} - - bool Recreate() override; - - SignalHandler::Result PushSignals(const SignalStreamMessage& msg) override; - - private: - void EstablishGRPCStream(); - bool EstablishGRPCStreamSingle(); - - std::shared_ptr channel_; - - std::atomic stream_active_; - - // This needs to have the same lifetime as the class. - std::unique_ptr context_; - std::unique_ptr> writer_; - - bool first_write_ = false; -}; - -class StdoutSignalServiceClient : public ISignalServiceClient { - public: - using SignalStreamMessage = sensor::SignalStreamMessage; - - explicit StdoutSignalServiceClient() = default; - - bool Recreate() override { return true; } - - SignalHandler::Result PushSignals(const SignalStreamMessage& msg) override; -}; - -} // namespace collector diff --git a/collector/lib/output/IClient.h b/collector/lib/output/IClient.h index a4299d7da8..c80d9aa3f0 100644 --- a/collector/lib/output/IClient.h +++ b/collector/lib/output/IClient.h @@ -2,11 +2,17 @@ #define OUTPUT_ICLIENT_H #include "internalapi/sensor/collector_iservice.grpc.pb.h" +#include "internalapi/sensor/signal_iservice.pb.h" #include "SignalHandler.h" namespace collector::output { +using MsgToSensor = std::variant< + sensor::ProcessSignal, + sensor::SignalStreamMessage, + sensor::NetworkConnectionInfoMessage>; + class IClient { public: using Service = sensor::CollectorService; @@ -18,15 +24,6 @@ class IClient { IClient& operator=(IClient&&) = delete; virtual ~IClient() = default; - /** - * Recreate the internal state of the object to allow communication. - * - * Mostly useful for handling gRPC reconnections. - * - * @returns true if the refresh was succesful, false otherwise. - */ - virtual bool Recreate() = 0; - /** * Send a message to sensor through the iservice. * @@ -34,7 +31,14 @@ class IClient { * @returns A SignalHandler::Result with the outcome of the send * operation. */ - virtual SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) = 0; + virtual SignalHandler::Result SendMsg(const MsgToSensor& msg) = 0; + + /** + * Check if IClient is ready to send messages. + * + * @returns true if IClient is ready to send, false otherwise. + */ + virtual bool IsReady() = 0; }; } // namespace collector::output diff --git a/collector/lib/output/Output.cpp b/collector/lib/output/Output.cpp index 2c6d1db294..bd7b10bec5 100644 --- a/collector/lib/output/Output.cpp +++ b/collector/lib/output/Output.cpp @@ -1,6 +1,5 @@ #include "Output.h" -#include "GRPCUtil.h" #include "output/grpc/Client.h" #include "output/log/Client.h" @@ -9,131 +8,40 @@ namespace collector::output { Output::Output(const CollectorConfig& config) : use_sensor_client_(!config.UseLegacyServices()) { if (config.grpc_channel != nullptr) { - channel_ = config.grpc_channel; - - if (use_sensor_client_) { - auto sensor_client = std::make_unique(channel_); - sensor_clients_.emplace_back(std::move(sensor_client)); - } else { - auto signal_client = std::make_unique(channel_); - signal_clients_.emplace_back(std::move(signal_client)); - } + auto sensor_client = std::make_unique(config.grpc_channel, client_ready_, use_sensor_client_); + network_control_channel_ = &sensor_client->GetControlMessageChannel(); + clients_.emplace_back(std::move(sensor_client)); } if (config.grpc_channel == nullptr || config.UseStdout()) { - if (use_sensor_client_) { - auto sensor_client = std::make_unique(); - sensor_clients_.emplace_back(std::move(sensor_client)); - } else { - auto signal_client = std::make_unique(); - signal_clients_.emplace_back(std::move(signal_client)); - } + auto sensor_client = std::make_unique(); + clients_.emplace_back(std::move(sensor_client)); } - - thread_.Start([this] { EstablishGrpcStream(); }); } -void Output::HandleOutputError() { - CLOG(ERROR) << "GRPC stream interrupted"; - stream_active_.store(false, std::memory_order_release); - stream_interrupted_.notify_one(); -} - -SignalHandler::Result Output::SensorOutput(const sensor::ProcessSignal& msg) { - for (auto& client : sensor_clients_) { +SignalHandler::Result Output::SendMsg(const MsgToSensor& msg) { + for (auto& client : clients_) { auto res = client->SendMsg(msg); - switch (res) { - case SignalHandler::PROCESSED: - break; - - case SignalHandler::ERROR: - HandleOutputError(); - return res; - - case SignalHandler::IGNORED: - case SignalHandler::NEEDS_REFRESH: - case SignalHandler::FINISHED: - return res; + if (res != SignalHandler::PROCESSED) { + return res; } } - return SignalHandler::PROCESSED; -} - -SignalHandler::Result Output::SignalOutput(const sensor::SignalStreamMessage& msg) { - for (auto& client : signal_clients_) { - auto res = client->PushSignals(msg); - switch (res) { - case SignalHandler::PROCESSED: - break; - - case SignalHandler::ERROR: - HandleOutputError(); - return res; - case SignalHandler::IGNORED: - case SignalHandler::NEEDS_REFRESH: - case SignalHandler::FINISHED: - return res; - } - } return SignalHandler::PROCESSED; } -SignalHandler::Result Output::SendMsg(const MessageType& msg) { - auto visitor = [this](auto&& m) { - using T = std::decay_t; - if constexpr (std::is_same_v) { - return SensorOutput(m); - } else if constexpr (std::is_same_v) { - return SignalOutput(m); - } - - // Unknown type - return SignalHandler::ERROR; - }; - - return std::visit(visitor, msg); -} - -void Output::EstablishGrpcStream() { - while (EstablishGrpcStreamSingle()) { - } - CLOG(INFO) << "Service client terminating."; +bool Output::IsReady() { + return std::all_of(clients_.begin(), clients_.end(), [](const auto& client) { + return client->IsReady(); + }); } -bool Output::EstablishGrpcStreamSingle() { - std::mutex mtx; - std::unique_lock lock(mtx); - stream_interrupted_.wait(lock, [this]() { return !stream_active_.load(std::memory_order_acquire) || thread_.should_stop(); }); - if (thread_.should_stop()) { - return false; - } - - CLOG(INFO) << "Trying to establish GRPC stream..."; +bool Output::WaitReady(const std::function& predicate) { + std::unique_lock lock{wait_mutex_}; + client_ready_.wait(lock, [this, predicate] { + return IsReady() || predicate(); + }); - if (!WaitForChannelReady(channel_, [this]() { return thread_.should_stop(); })) { - return false; - } - if (thread_.should_stop()) { - return false; - } - - // Refresh all clients - bool success = true; - for (const auto& client : signal_clients_) { - success &= client->Recreate(); - } - - for (const auto& client : sensor_clients_) { - success &= client->Recreate(); - } - - if (success) { - CLOG(INFO) << "Successfully established GRPC stream."; - stream_active_.store(true, std::memory_order_release); - } else { - CLOG(WARNING) << "Failed to establish GRPC stream, retrying..."; - } - return true; + return IsReady(); } } // namespace collector::output diff --git a/collector/lib/output/Output.h b/collector/lib/output/Output.h index e30b88e79b..4285894398 100644 --- a/collector/lib/output/Output.h +++ b/collector/lib/output/Output.h @@ -1,40 +1,31 @@ #pragma once -#include - -#include "internalapi/sensor/signal_iservice.pb.h" - +#include "Channel.h" #include "CollectorConfig.h" #include "IClient.h" #include "SignalHandler.h" -#include "SignalServiceClient.h" -#include "StoppableThread.h" namespace collector::output { -using MessageType = std::variant; - class Output { public: Output(const Output&) = delete; Output(Output&&) = delete; Output& operator=(const Output&) = delete; Output& operator=(Output&&) = delete; - - Output(const CollectorConfig& config); - ~Output() { - stream_interrupted_.notify_one(); - if (thread_.running()) { - thread_.Stop(); - } + // In case we have someone waiting on the client to be ready, + // we notify them. + client_ready_.notify_all(); } + Output(const CollectorConfig& config); + // Constructor for tests - Output(std::unique_ptr&& sensor_client, - std::unique_ptr&& signal_client) { - sensor_clients_.emplace_back(std::move(sensor_client)); - signal_clients_.emplace_back(std::move(signal_client)); + Output(Channel& ch) + : network_control_channel_(&ch) {} + Output(std::unique_ptr&& sensor_client) { + clients_.emplace_back(std::move(sensor_client)); } /** @@ -46,7 +37,7 @@ class Output { * @returns A SignalHandler::Result with the outcome of the send * operation */ - SignalHandler::Result SendMsg(const MessageType& msg); + virtual SignalHandler::Result SendMsg(const MsgToSensor& msg); /** * Whether we should use the new iservice or not. @@ -56,23 +47,45 @@ class Output { */ bool UseSensorClient() const { return use_sensor_client_; } - private: - void EstablishGrpcStream(); - bool EstablishGrpcStreamSingle(); + /** + * Retrieve the channel where the network control messages from sensor + * are forwarded to. + * + * @returns A channel to retrieve control messages from. + */ + Channel& GetNetworkControlChannel() { + return *network_control_channel_; + } - void HandleOutputError(); - SignalHandler::Result SensorOutput(const sensor::ProcessSignal& msg); - SignalHandler::Result SignalOutput(const sensor::SignalStreamMessage& msg); + /** + * Check if Output is ready to send messages by querying all its + * clients. + * + * @returns true if Output is ready to send, false otherwise. + */ + bool IsReady(); - std::vector> sensor_clients_; - std::vector> signal_clients_; + /** + * Wait for all clients to be ready. + * + * This method will block until all clients are ready to send or the + * predicate returns true, whichever happens first. + * + * @param predicate A predicate for allowing unblocking in case caller + * does not want to keep waiting. + * @returns true if Output is ready, false if exited via the + * predicate. + */ + bool WaitReady(const std::function& predicate); + + private: + std::vector> clients_; + Channel* network_control_channel_ = nullptr; bool use_sensor_client_ = true; - StoppableThread thread_; - std::atomic stream_active_ = false; - std::condition_variable stream_interrupted_; - std::shared_ptr channel_; + std::mutex wait_mutex_; + std::condition_variable client_ready_; }; } // namespace collector::output diff --git a/collector/lib/output/SensorClient.h b/collector/lib/output/SensorClient.h deleted file mode 100644 index eac98b886e..0000000000 --- a/collector/lib/output/SensorClient.h +++ /dev/null @@ -1,85 +0,0 @@ -#pragma once - -#include - -#include - -#include "internalapi/sensor/collector_iservice.grpc.pb.h" - -#include "DuplexGRPC.h" -#include "SignalHandler.h" - -namespace collector::output { - -class ISensorClient { - public: - using Service = sensor::CollectorService; - - ISensorClient() = default; - ISensorClient(const ISensorClient&) = default; - ISensorClient(ISensorClient&&) = delete; - ISensorClient& operator=(const ISensorClient&) = default; - ISensorClient& operator=(ISensorClient&&) = delete; - virtual ~ISensorClient() = default; - - /** - * Recreate the internal state of the object to allow communication. - * - * Mostly useful for handling gRPC reconnections. - * - * @returns true if the refresh was succesful, false otherwise. - */ - virtual bool Recreate() = 0; - - /** - * Send a message to sensor through the iservice. - * - * @param msg The message to be sent to sensor. - * @returns A SignalHandler::Result with the outcome of the send - * operation. - */ - virtual SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) = 0; -}; - -class SensorClient : public ISensorClient { - public: - using Service = sensor::CollectorService; - - SensorClient(const SensorClient&) = delete; - SensorClient(SensorClient&&) = delete; - SensorClient& operator=(const SensorClient&) = delete; - SensorClient& operator=(SensorClient&&) = delete; - ~SensorClient() override { - context_->TryCancel(); - } - - explicit SensorClient(std::shared_ptr channel) - : channel_(std::move(channel)) { - } - - bool Recreate() override; - - SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override; - - private: - std::shared_ptr channel_; - - std::atomic stream_active_ = false; - - // This needs to have the same lifetime as the class. - std::unique_ptr context_; - std::unique_ptr> writer_; - - bool first_write_ = false; -}; - -class SensorClientStdout : public ISensorClient { - bool Recreate() override { return true; } - - SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override { - LogProtobufMessage(msg); - return SignalHandler::PROCESSED; - } -}; - -} // namespace collector::output diff --git a/collector/lib/output/grpc/Client.cpp b/collector/lib/output/grpc/Client.cpp index 5e584174e4..406d0e4339 100644 --- a/collector/lib/output/grpc/Client.cpp +++ b/collector/lib/output/grpc/Client.cpp @@ -1,47 +1,86 @@ #include "Client.h" +#include + +#include "GRPCUtil.h" #include "Logging.h" +#include "SignalHandler.h" namespace collector::output::grpc { + bool Client::Recreate() { - context_ = std::make_unique<::grpc::ClientContext>(); - writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncPushProcesses, channel_, context_.get()); - if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) { - CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ..."; - CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message(); - writer_.reset(); + if (use_sensor_client_) { + if (!process_client_->Recreate()) { + return false; + } + } else { + if (!signal_client_->Recreate()) { + return false; + } + } + + if (!network_connection_client_->Recreate()) { return false; } - first_write_ = true; stream_active_.store(true, std::memory_order_release); + client_ready_.notify_all(); return true; } -SignalHandler::Result Client::SendMsg(const sensor::ProcessSignal& msg) { +SignalHandler::Result Client::SendMsg(const MsgToSensor& msg) { if (!stream_active_.load(std::memory_order_acquire)) { CLOG_THROTTLED(ERROR, std::chrono::seconds(10)) << "GRPC stream is not established"; return SignalHandler::ERROR; } - if (first_write_) { - first_write_ = false; - return SignalHandler::NEEDS_REFRESH; + // Delegate to the proper SendMsg implementation depending on the type held. + if (const auto* m = std::get_if(&msg)) { + return process_client_->SendMsg(*m); } - auto res = writer_->Write(msg); - if (!res) { - auto status = writer_->FinishNow(); - if (!status.ok()) { - CLOG(ERROR) << "GRPC writes failed: (" << status.error_code() << ") " << status.error_message(); - } - writer_.reset(); + if (const auto* m = std::get_if(&msg)) { + return signal_client_->SendMsg(*m); + } - stream_active_.store(false, std::memory_order_release); - return SignalHandler::ERROR; + if (const auto* m = std::get_if(&msg)) { + return network_connection_client_->SendMsg(*m); } - return SignalHandler::PROCESSED; + CLOG(ERROR) << "Unknown type in MsgToSensor variant"; + return SignalHandler::Result::ERROR; +} + +void Client::EstablishGrpcStream() { + while (EstablishGrpcStreamSingle()) { + } + CLOG(INFO) << "Service client terminating."; +} + +bool Client::EstablishGrpcStreamSingle() { + std::mutex mtx; + std::unique_lock lock(mtx); + stream_interrupted_.wait(lock, [this]() { return !stream_active_.load(std::memory_order_acquire) || thread_.should_stop(); }); + if (thread_.should_stop()) { + return false; + } + + CLOG(INFO) << "Trying to establish GRPC stream..."; + + if (!WaitForChannelReady(channel_, [this]() { return thread_.should_stop(); })) { + return false; + } + if (thread_.should_stop()) { + return false; + } + + // Refresh all clients + if (Recreate()) { + CLOG(INFO) << "Successfully established GRPC stream."; + } else { + CLOG(WARNING) << "Failed to establish GRPC stream, retrying..."; + } + return true; } } // namespace collector::output::grpc diff --git a/collector/lib/output/grpc/Client.h b/collector/lib/output/grpc/Client.h index b7a8b851cd..adbbac54fc 100644 --- a/collector/lib/output/grpc/Client.h +++ b/collector/lib/output/grpc/Client.h @@ -3,13 +3,16 @@ #include -#include "DuplexGRPC.h" +#include "Channel.h" +#include "StoppableThread.h" #include "output/IClient.h" +#include "output/grpc/IGrpcClient.h" +#include "output/grpc/NetworkConnectionClient.h" +#include "output/grpc/ProcessClient.h" +#include "output/grpc/SignalServiceClient.h" namespace collector::output::grpc { -using Channel = ::grpc::Channel; - class Client : public IClient { public: using Service = sensor::CollectorService; @@ -19,27 +22,52 @@ class Client : public IClient { Client& operator=(const Client&) = delete; Client& operator=(Client&&) = delete; ~Client() override { - context_->TryCancel(); + stream_interrupted_.notify_one(); + thread_.Stop(); } - explicit Client(std::shared_ptr channel) - : channel_(std::move(channel)) { + explicit Client(std::shared_ptr channel, std::condition_variable& client_ready_cv, bool use_sensor_client) + : client_ready_(client_ready_cv), channel_(std::move(channel)), use_sensor_client_(use_sensor_client) { + if (use_sensor_client_) { + process_client_ = std::make_unique(channel_); + } else { + signal_client_ = std::make_unique(channel_); + } + network_connection_client_ = std::make_unique(channel_, use_sensor_client_); + + if (!EstablishGrpcStreamSingle()) { + CLOG(FATAL) << "Failed to establish grpc stream"; + } + + thread_.Start([this] { EstablishGrpcStream(); }); } - bool Recreate() override; + bool Recreate(); + SignalHandler::Result SendMsg(const MsgToSensor& msg) override; + bool IsReady() override { return stream_active_.load(std::memory_order_acquire); } - SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override; + Channel& GetControlMessageChannel() { + return network_connection_client_->GetControlMessageChannel(); + } private: - std::shared_ptr channel_; + void EstablishGrpcStream(); + bool EstablishGrpcStreamSingle(); - std::atomic stream_active_ = false; + // New clients + std::unique_ptr process_client_; + std::unique_ptr network_connection_client_; + + // Legacy clients + std::unique_ptr signal_client_; - // This needs to have the same lifetime as the class. - std::unique_ptr<::grpc::ClientContext> context_; - std::unique_ptr> writer_; + std::atomic stream_active_ = false; + std::condition_variable stream_interrupted_; + std::condition_variable& client_ready_; + std::shared_ptr channel_; + StoppableThread thread_; - bool first_write_ = false; + bool use_sensor_client_; }; } // namespace collector::output::grpc diff --git a/collector/lib/output/grpc/IGrpcClient.h b/collector/lib/output/grpc/IGrpcClient.h new file mode 100644 index 0000000000..e0e71959ad --- /dev/null +++ b/collector/lib/output/grpc/IGrpcClient.h @@ -0,0 +1,28 @@ +#ifndef OUTPUT_IGRPC_CLIENT +#define OUTPUT_IGRPC_CLIENT + +#include + +namespace collector::output::grpc { +using GrpcChannel = ::grpc::Channel; +using ClientContext = ::grpc::ClientContext; + +class IGrpcClient { + public: + IGrpcClient() = default; + IGrpcClient(const IGrpcClient&) = default; + IGrpcClient(IGrpcClient&&) = delete; + IGrpcClient& operator=(const IGrpcClient&) = default; + IGrpcClient& operator=(IGrpcClient&&) = delete; + virtual ~IGrpcClient() = default; + + /** + * Recreate the internal state of the client. + * + * @returns true if the refresh was successful, false otherwise. + */ + virtual bool Recreate() = 0; +}; +} // namespace collector::output::grpc + +#endif diff --git a/collector/lib/output/grpc/NetworkConnectionClient.cpp b/collector/lib/output/grpc/NetworkConnectionClient.cpp new file mode 100644 index 0000000000..5c88d43bb8 --- /dev/null +++ b/collector/lib/output/grpc/NetworkConnectionClient.cpp @@ -0,0 +1,85 @@ +#include "NetworkConnectionClient.h" + +#include "internalapi/sensor/network_connection_iservice.grpc.pb.h" + +#include "DuplexGRPC.h" +#include "HostInfo.h" + +namespace collector::output::grpc { +bool NetworkConnectionClient::Recreate() { + context_ = NewContext(); + if (use_sensor_client_) { + writer_ = NewWriter(); + + if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) { + CLOG(ERROR) << "Process stream not ready after 30 seconds. Retrying ..."; + CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message(); + writer_.reset(); + return false; + } + } else { + legacy_writer_ = NewLegacyWriter(); + + if (!legacy_writer_->WaitUntilStarted(std::chrono::seconds(30))) { + CLOG(ERROR) << "Process stream not ready after 30 seconds. Retrying ..."; + CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message(); + writer_.reset(); + return false; + } + } + + return true; +} + +SignalHandler::Result NetworkConnectionClient::SendMsg(const sensor::NetworkConnectionInfoMessage& msg) { + grpc_duplex_impl::Result res(grpc_duplex_impl::Status::OK); + + if (use_sensor_client_) { + res = writer_->Write(msg.info()); + } else { + res = legacy_writer_->Write(msg); + } + + if (!res) { + auto status = writer_->FinishNow(); + if (!status.ok()) { + CLOG(ERROR) << "GRPC writes failed: (" << status.error_code() << ") " << status.error_message(); + } + writer_.reset(); + return SignalHandler::ERROR; + } + + return SignalHandler::PROCESSED; +} + +std::unique_ptr<::grpc::ClientContext> NetworkConnectionClient::NewContext() { + auto ctx = std::make_unique<::grpc::ClientContext>(); + ctx->AddMetadata(static_cast(kHostnameMetadataKey), HostInfo::GetHostname()); + ctx->AddMetadata(static_cast(kCapsMetadataKey), static_cast(kSupportedCaps)); + return ctx; +} + +std::unique_ptr> NetworkConnectionClient::NewWriter() { + return DuplexClient::CreateWithReadCallback( + &sensor::CollectorService::Stub::AsyncPushNetworkConnectionInfo, + channel_, + context_.get(), + {[this](const auto* msg) { + // Creates a copy of the message. We are not expecting to receive + // the control message too often, so it should be fine for now. + network_control_channel_ << *msg; + }}); +} + +std::unique_ptr> NetworkConnectionClient::NewLegacyWriter() { + return DuplexClient::CreateWithReadCallback( + &sensor::NetworkConnectionInfoService::Stub::AsyncPushNetworkConnectionInfo, + channel_, + context_.get(), + {[this](const auto* msg) { + // Creates a copy of the message. We are not expecting to receive + // the control message too often, so it should be fine for now. + network_control_channel_ << *msg; + }}); +} +} // namespace collector::output::grpc diff --git a/collector/lib/output/grpc/NetworkConnectionClient.h b/collector/lib/output/grpc/NetworkConnectionClient.h new file mode 100644 index 0000000000..e634620fd9 --- /dev/null +++ b/collector/lib/output/grpc/NetworkConnectionClient.h @@ -0,0 +1,63 @@ +#ifndef OUTPUT_GRPC_NETWORK_CONNECTION_CLIENT_H +#define OUTPUT_GRPC_NETWORK_CONNECTION_CLIENT_H + +#include "internalapi/sensor/collector_iservice.grpc.pb.h" + +#include "Channel.h" +#include "DuplexGRPC.h" +#include "IGrpcClient.h" +#include "SignalHandler.h" + +namespace collector::output::grpc { + +class NetworkConnectionClient : public IGrpcClient { + public: + NetworkConnectionClient(const NetworkConnectionClient&) = delete; + NetworkConnectionClient(NetworkConnectionClient&&) = delete; + NetworkConnectionClient& operator=(const NetworkConnectionClient&) = delete; + NetworkConnectionClient& operator=(NetworkConnectionClient&&) = delete; + ~NetworkConnectionClient() override { + if (context_) { + context_->TryCancel(); + } + } + + NetworkConnectionClient(std::shared_ptr channel, bool use_sensor_client) + : channel_(std::move(channel)), + use_sensor_client_(use_sensor_client) {} + + bool Recreate() override; + + SignalHandler::Result SendMsg(const sensor::NetworkConnectionInfoMessage& msg); + + Channel& GetControlMessageChannel() { + return network_control_channel_; + } + + private: + static std::unique_ptr<::grpc::ClientContext> NewContext(); + std::unique_ptr> NewWriter(); + std::unique_ptr> NewLegacyWriter(); + + static constexpr char kHostnameMetadataKey[] = "rox-collector-hostname"; + static constexpr char kCapsMetadataKey[] = "rox-collector-capabilities"; + + // Keep this updated with all capabilities supported. Format it as a comma-separated list with NO spaces. + static constexpr char kSupportedCaps[] = "public-ips,network-graph-external-srcs"; + + std::shared_ptr channel_; + std::unique_ptr<::grpc::ClientContext> context_; + std::unique_ptr> writer_; + std::unique_ptr> legacy_writer_; + + // A channel used for receiving events from the network connection + // client. It needs to be blocking, since the read side will + // overwrite the buffer on every call. + Channel network_control_channel_; + + bool use_sensor_client_; +}; + +} // namespace collector::output::grpc + +#endif diff --git a/collector/lib/output/grpc/ProcessClient.h b/collector/lib/output/grpc/ProcessClient.h new file mode 100644 index 0000000000..3026c43358 --- /dev/null +++ b/collector/lib/output/grpc/ProcessClient.h @@ -0,0 +1,69 @@ +#ifndef OUTPUT_GRPC_PROCESS_CLIENT +#define OUTPUT_GRPC_PROCESS_CLIENT + +#include "internalapi/sensor/collector_iservice.grpc.pb.h" + +#include "DuplexGRPC.h" +#include "IGrpcClient.h" +#include "Logging.h" +#include "SignalHandler.h" + +namespace collector::output::grpc { +class ProcessClient : IGrpcClient { + public: + ProcessClient(const ProcessClient&) = delete; + ProcessClient(ProcessClient&&) = delete; + ProcessClient& operator=(const ProcessClient&) = delete; + ProcessClient& operator=(ProcessClient&&) = delete; + ~ProcessClient() override { + if (context_) { + context_->TryCancel(); + } + } + + ProcessClient(std::shared_ptr channel) + : channel_(std::move(channel)) {} + + bool Recreate() override { + context_ = std::make_unique<::grpc::ClientContext>(); + writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncPushProcesses, channel_, context_.get()); + + if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) { + CLOG(ERROR) << "Process stream not ready after 30 seconds. Retrying ..."; + CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message(); + writer_.reset(); + return false; + } + + return true; + } + + SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) { + if (first_write_) { + first_write_ = false; + return SignalHandler::NEEDS_REFRESH; + } + + auto res = writer_->Write(msg); + if (!res) { + auto status = writer_->FinishNow(); + if (!status.ok()) { + CLOG(ERROR) << "GRPC writes failed: (" << status.error_code() << ") " << status.error_message(); + } + writer_.reset(); + return SignalHandler::ERROR; + } + + return SignalHandler::PROCESSED; + } + + private: + bool first_write_ = false; + + std::shared_ptr channel_; + std::unique_ptr<::grpc::ClientContext> context_; + std::unique_ptr> writer_; +}; +} // namespace collector::output::grpc + +#endif diff --git a/collector/lib/output/grpc/SignalServiceClient.h b/collector/lib/output/grpc/SignalServiceClient.h new file mode 100644 index 0000000000..79a2c34ce9 --- /dev/null +++ b/collector/lib/output/grpc/SignalServiceClient.h @@ -0,0 +1,70 @@ +#ifndef OUTPUT_GRPC_SIGNAL_SERVICE_CLIENT +#define OUTPUT_GRPC_SIGNAL_SERVICE_CLIENT + +#include "internalapi/sensor/signal_iservice.grpc.pb.h" + +#include "DuplexGRPC.h" +#include "IGrpcClient.h" +#include "Logging.h" +#include "SignalHandler.h" + +namespace collector::output::grpc { + +class SignalServiceClient : public IGrpcClient { + public: + SignalServiceClient(const SignalServiceClient&) = delete; + SignalServiceClient(SignalServiceClient&&) = delete; + SignalServiceClient& operator=(const SignalServiceClient&) = delete; + SignalServiceClient& operator=(SignalServiceClient&&) = delete; + ~SignalServiceClient() override { + if (context_) { + context_->TryCancel(); + } + } + + SignalServiceClient(std::shared_ptr channel) + : channel_(std::move(channel)) {} + + bool Recreate() override { + context_ = std::make_unique<::grpc::ClientContext>(); + writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::SignalService::Stub::AsyncPushSignals, channel_, context_.get()); + + if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) { + CLOG(ERROR) << "Process stream not ready after 30 seconds. Retrying ..."; + CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message(); + writer_.reset(); + return false; + } + return true; + } + + SignalHandler::Result SendMsg(const sensor::SignalStreamMessage& msg) { + if (first_write_) { + first_write_ = false; + return SignalHandler::NEEDS_REFRESH; + } + + auto res = writer_->Write(msg); + if (!res) { + auto status = writer_->FinishNow(); + if (!status.ok()) { + CLOG(ERROR) << "GRPC writes failed: (" << status.error_code() << ") " << status.error_message(); + } + writer_.reset(); + return SignalHandler::ERROR; + } + + return SignalHandler::PROCESSED; + } + + private: + bool first_write_ = false; + + std::shared_ptr channel_; + std::unique_ptr<::grpc::ClientContext> context_; + std::unique_ptr> writer_; +}; + +} // namespace collector::output::grpc + +#endif diff --git a/collector/lib/output/log/Client.h b/collector/lib/output/log/Client.h index dac07bce4a..ff1c5b5d37 100644 --- a/collector/lib/output/log/Client.h +++ b/collector/lib/output/log/Client.h @@ -8,12 +8,15 @@ namespace collector::output::log { class Client : public IClient { - bool Recreate() override { return true; } - - SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override { - LogProtobufMessage(msg); + SignalHandler::Result SendMsg(const MsgToSensor& msg) override { + // This works because all variants of MsgToSensor inherit from + // google::protobuf::Message, make sure it stays that way! + std::visit([](const auto& v) { LogProtobufMessage(v); }, msg); return SignalHandler::PROCESSED; } + + // Always ready to send + bool IsReady() override { return true; } }; } // namespace collector::output::log #endif diff --git a/collector/lib/ContainerEngine.h b/collector/lib/system-inspector/ContainerEngine.h similarity index 97% rename from collector/lib/ContainerEngine.h rename to collector/lib/system-inspector/ContainerEngine.h index 63978528c9..93677055bd 100644 --- a/collector/lib/ContainerEngine.h +++ b/collector/lib/system-inspector/ContainerEngine.h @@ -1,5 +1,6 @@ #pragma once +#include "Utility.h" #include "container_engine/container_cache_interface.h" #include "container_engine/container_engine_base.h" #include "threadinfo.h" diff --git a/collector/test/ChannelTest.cpp b/collector/test/ChannelTest.cpp new file mode 100644 index 0000000000..f4629a3213 --- /dev/null +++ b/collector/test/ChannelTest.cpp @@ -0,0 +1,139 @@ +#include + +#include +#include + +#include "Channel.h" + +namespace collector { +TEST(Basic, Basic) { + auto channel = Channel(); + channel << 4; + int out = 0; + channel >> out; + EXPECT_EQ(out, 4); +} + +TEST(Basic, Multiple) { + auto channel = Channel(); + channel + << 4 + << 3 + << 2 + << 1 + << 0; + for (int i = 4; i > 0; i--) { + int out = 0; + channel >> out; + EXPECT_EQ(out, i); + } +} + +TEST(Basic, Iterleaved) { + auto channel = Channel(8); + channel + << 4 + << 3; + int out = 0; + channel >> out; + EXPECT_EQ(out, 4); + channel + << 2 + << 1; + + for (int i = 3; i > 0; i--) { + channel >> out; + EXPECT_EQ(out, i); + } + + channel + << 10 + << 20; + + channel >> out; + EXPECT_EQ(out, 10); + + channel >> out; + EXPECT_EQ(out, 20); +} + +TEST(Basic, String) { + auto channel = Channel(8); + std::string should_move = "This should be moved"; + std::string should_copy = "This should be copied"; + channel + << std::move(should_move) + << should_copy; + + std::string out; + channel >> out; + EXPECT_EQ(out, "This should be moved"); + channel >> out; + EXPECT_EQ(out, "This should be copied"); +} + +TEST(Multithread, NonBlocking) { + auto channel = Channel(); + auto th = std::thread([&channel] { + for (int i = 0; i < 10; i++) { + channel << i; + } + + channel.Close(); + }); + + int j = 0; + for (auto i : channel) { + EXPECT_EQ(i, j); + j++; + } + + EXPECT_EQ(j, 10); + + th.join(); +} + +TEST(Multithread, Blocking) { + auto channel = Channel(1); + auto th = std::thread([&channel] { + for (int i = 0; i < 10; i++) { + channel << i; + } + + channel.Close(); + }); + + int j = 0; + for (auto i : channel) { + EXPECT_EQ(i, j); + j++; + } + + EXPECT_EQ(j, 10); + + th.join(); +} + +TEST(Multithread, CloseOnReader) { + auto channel = Channel(1); + auto th = std::thread([&channel] { + for (int i = 0; !channel.IsClosed(); i++) { + channel << i; + } + }); + + int j = 0; + for (auto i : channel) { + EXPECT_EQ(i, j); + j++; + + if (j == 10) { + channel.Close(); + } + } + + EXPECT_EQ(j, 10); + + th.join(); +} +} // namespace collector diff --git a/collector/test/NetworkStatusNotifierTest.cpp b/collector/test/NetworkStatusNotifierTest.cpp index d26a4426cd..9448c2271a 100644 --- a/collector/test/NetworkStatusNotifierTest.cpp +++ b/collector/test/NetworkStatusNotifierTest.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -5,12 +6,17 @@ #include #include "internalapi/sensor/network_connection_iservice.grpc.pb.h" +#include "internalapi/sensor/network_connection_iservice.pb.h" +#include "internalapi/sensor/network_enums.pb.h" #include "CollectorConfig.h" #include "DuplexGRPC.h" +#include "NetworkConnection.h" #include "NetworkStatusNotifier.h" #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "output/IClient.h" +#include "output/grpc/IGrpcClient.h" #include "system-inspector/Service.h" namespace collector { @@ -24,52 +30,9 @@ using ::testing::Return; using ::testing::ReturnPointee; using ::testing::UnorderedElementsAre; -/* Semaphore: A subset of the semaphore feature. - We use it to wait (with a timeout) for the NetworkStatusNotifier service thread to achieve the test scenario. - (C++20 comes with a compatible class (std::counting_semaphore) and we probably want to use this once we - update the C++ version). */ -class Semaphore { - public: - Semaphore(int value = 1) : value_(value) {} - - void release() { - std::unique_lock lock(mutex_); - value_++; - cond_.notify_one(); - } - - void acquire() { - std::unique_lock lock(mutex_); - - while (value_ <= 0) { - cond_.wait(lock); - } - value_--; - } - - template - bool try_acquire_for(const std::chrono::duration& rel_time) { - auto deadline = std::chrono::steady_clock::now() + rel_time; - std::unique_lock lock(mutex_); - - while (value_ <= 0) { - if (cond_.wait_until(lock, deadline) == std::cv_status::timeout) { - return false; - } - } - value_--; - return true; - } - - private: - int value_; - std::condition_variable cond_; - std::mutex mutex_; -}; - class MockCollectorConfig : public collector::CollectorConfig { public: - MockCollectorConfig() : collector::CollectorConfig() {} + MockCollectorConfig() = default; void DisableAfterglow() { enable_afterglow_ = false; @@ -78,6 +41,10 @@ class MockCollectorConfig : public collector::CollectorConfig { void SetMaxConnectionsPerMinute(int64_t limit) { max_connections_per_minute_ = limit; } + + void SetScrapInterval(int interval) { + scrape_interval_ = interval; + } }; class MockConnScraper : public IConnScraper { @@ -85,29 +52,19 @@ class MockConnScraper : public IConnScraper { MOCK_METHOD(bool, Scrape, (std::vector * connections, std::vector* listen_endpoints), (override)); }; -class MockDuplexClientWriter : public IDuplexClientWriter { +class MockOutput : public output::Output { public: - MOCK_METHOD(grpc_duplex_impl::Result, Write, (const sensor::NetworkConnectionInfoMessage& obj, const gpr_timespec& deadline), (override)); - MOCK_METHOD(grpc_duplex_impl::Result, WriteAsync, (const sensor::NetworkConnectionInfoMessage& obj), (override)); - MOCK_METHOD(grpc_duplex_impl::Result, WaitUntilStarted, (const gpr_timespec& deadline), (override)); - MOCK_METHOD(bool, Sleep, (const gpr_timespec& deadline), (override)); - MOCK_METHOD(grpc_duplex_impl::Result, WritesDoneAsync, (), (override)); - MOCK_METHOD(grpc_duplex_impl::Result, WritesDone, (const gpr_timespec& deadline), (override)); - MOCK_METHOD(grpc_duplex_impl::Result, FinishAsync, (), (override)); - MOCK_METHOD(grpc_duplex_impl::Result, WaitUntilFinished, (const gpr_timespec& deadline), (override)); - MOCK_METHOD(grpc_duplex_impl::Result, Finish, (grpc::Status * status, const gpr_timespec& deadline), (override)); - MOCK_METHOD(grpc::Status, Finish, (const gpr_timespec& deadline), (override)); - MOCK_METHOD(void, TryCancel, (), (override)); - MOCK_METHOD(grpc_duplex_impl::Result, Shutdown, (), (override)); -}; + MockOutput(const MockOutput&) = delete; + MockOutput(MockOutput&&) = delete; + MockOutput& operator=(const MockOutput&) = delete; + MockOutput& operator=(MockOutput&&) = delete; + virtual ~MockOutput() = default; -class MockNetworkConnectionInfoServiceComm : public INetworkConnectionInfoServiceComm { - public: - MOCK_METHOD(void, ResetClientContext, (), (override)); - MOCK_METHOD(bool, WaitForConnectionReady, (const std::function& check_interrupted), (override)); - MOCK_METHOD(void, TryCancel, (), (override)); - MOCK_METHOD(sensor::NetworkConnectionInfoService::StubInterface*, GetStub, (), (override)); - MOCK_METHOD(std::unique_ptr>, PushNetworkConnectionInfoOpenStream, (std::function receive_func), (override)); + MockOutput(Channel& ch) + : Output(ch) {} + + MOCK_METHOD(SignalHandler::Result, SendMsg, (const output::MsgToSensor& msg)); + MOCK_METHOD(bool, IsReady, ()); }; /* gRPC payload objects are not strictly the ones of our internal model. @@ -143,7 +100,7 @@ class NetworkConnectionInfoMessageParser { switch (bytes_stream.length()) { case 0: - return IPNet(); + return {}; case 4: case 5: { uint32_t ip; @@ -165,10 +122,9 @@ class NetworkConnectionInfoMessageParser { if (is_net) { const char* prefix_len = bytes_stream.c_str() + (bytes_stream.length() - 1); - return IPNet(addr, *prefix_len); - } else { - return IPNet(addr); + return {addr, static_cast(*prefix_len)}; } + return IPNet(addr); } static L4Proto L4ProtocolFromProto(storage::L4Protocol proto) { @@ -185,166 +141,152 @@ class NetworkConnectionInfoMessageParser { } }; +MATCHER_P(MsgToSensorMatcher, expected, "Match elements in a MsgToSensor to a Connection") { + if (const auto* msg = std::get_if(&arg)) { + const auto connections = NetworkConnectionInfoMessageParser(*msg).get_updated_connections(); + EXPECT_THAT(connections, expected); + return true; + } + return false; +} + } // namespace class NetworkStatusNotifierTest : public testing::Test { public: NetworkStatusNotifierTest() - : inspector(config, nullptr), - conn_tracker(std::make_shared()), + : conn_tracker(std::make_shared()), conn_scraper(std::make_unique()), - comm(std::make_unique()), - net_status_notifier(conn_tracker, config, &inspector, nullptr) { + output(ch), + inspector(config, &output), + net_status_notifier(conn_tracker, config, &inspector, &output, nullptr) { } protected: MockCollectorConfig config; - system_inspector::Service inspector; std::shared_ptr conn_tracker; std::unique_ptr conn_scraper; - std::unique_ptr comm; + Channel ch; + MockOutput output; + system_inspector::Service inspector; NetworkStatusNotifier net_status_notifier; + + // Used for waiting on test done. + std::mutex mutex; + std::condition_variable cv; }; /* Simple validation that the service starts and sends at least one event */ TEST_F(NetworkStatusNotifierTest, SimpleStartStop) { - bool running = true; - Semaphore sem(0); // to wait for the service to accomplish its job. - - // the connection is always ready - EXPECT_CALL(*comm, WaitForConnectionReady).WillRepeatedly(Return(true)); - // gRPC shuts down the loop, so we will want writer->Sleep to return with false - EXPECT_CALL(*comm, TryCancel).Times(1).WillOnce([&running] { running = false; }); - - /* This is what NetworkStatusNotifier calls to create streams - receive_func is a callback that we can use to simulate messages coming from the sensor - We return an object that will get called when connections and endpoints are reported */ - EXPECT_CALL(*comm, PushNetworkConnectionInfoOpenStream) - .Times(1) - .WillOnce([&sem, &running](std::function receive_func) -> std::unique_ptr> { - auto duplex_writer = std::make_unique(); - - // the service is sending Sensor a message - EXPECT_CALL(*duplex_writer, Write).WillRepeatedly([&sem, &running](const sensor::NetworkConnectionInfoMessage& msg, const gpr_timespec& deadline) -> Result { - for (auto cnx : msg.info().updated_connections()) { - std::cout << cnx.container_id() << std::endl; - } - sem.release(); // notify that the test should end - return Result(Status::OK); - }); - EXPECT_CALL(*duplex_writer, Sleep).WillRepeatedly(ReturnPointee(&running)); - EXPECT_CALL(*duplex_writer, WaitUntilStarted).WillRepeatedly(Return(Result(Status::OK))); - - return duplex_writer; - }); + Connection conn{ + "containerId", + Endpoint{Address{10, 0, 1, 32}, 1024}, + Endpoint{Address{139, 45, 27, 4}, 999}, + L4Proto::TCP, + true}; + // Expect the connection to be normalized + Connection expected{ + "containerId", + Endpoint{Address{}, 1024}, + Endpoint{Address{255, 255, 255, 255}, 0}, + L4Proto::TCP, + true}; // Connections/Endpoints returned by the scrapper - EXPECT_CALL(*conn_scraper, Scrape).WillRepeatedly([&sem](std::vector* connections, std::vector* listen_endpoints) -> bool { + EXPECT_CALL(*conn_scraper, Scrape).WillRepeatedly([&conn](std::vector* connections, std::vector* listen_endpoints) -> bool { // this is the data which will trigger NetworkStatusNotifier to create a connection event (purpose of the test) - connections->emplace_back("containerId", Endpoint(Address(10, 0, 1, 32), 1024), Endpoint(Address(139, 45, 27, 4), 999), L4Proto::TCP, true); + connections->push_back(conn); return true; }); + EXPECT_CALL(output, SendMsg(MsgToSensorMatcher(UnorderedElementsAre(std::make_pair(expected, true))))) + .WillOnce([this](const output::MsgToSensor& a) { + cv.notify_one(); + return SignalHandler::PROCESSED; + }); + net_status_notifier.ReplaceConnScraper(std::move(conn_scraper)); - net_status_notifier.ReplaceComm(std::move(comm)); net_status_notifier.Start(); - EXPECT_TRUE(sem.try_acquire_for(std::chrono::seconds(5))); + std::unique_lock lock{mutex}; + EXPECT_EQ(cv.wait_for(lock, std::chrono::seconds(5)), std::cv_status::no_timeout); net_status_notifier.Stop(); } /* This test checks whether deltas are computed appropriately in case the "known network" list is received after a connection is already reported (and matches one of the networks). - - scrapper initialy reports a connection + - scrapper initially reports a connection - the connection is reported - we feed a "known network" into the NetworkStatusNotifier - we check that the former declared connection is deleted and redeclared as part of this network */ TEST_F(NetworkStatusNotifierTest, UpdateIPnoAfterglow) { - bool running = true; config.DisableAfterglow(); - std::function network_flows_callback; - Semaphore sem(0); // to wait for the service to accomplish its job. + config.SetScrapInterval(1); // Immediately scrape + std::function + network_flows_callback; // the connection as scrapped (public) - Connection conn1("containerId", Endpoint(Address(10, 0, 1, 32), 1024), Endpoint(Address(139, 45, 27, 4), 999), L4Proto::TCP, true); + Connection connection_scrapped{ + "containerId", + Endpoint{Address{10, 0, 1, 32}, 1024}, + Endpoint{Address{139, 45, 27, 4}, 999}, + L4Proto::TCP, + true}; // the same server connection normalized - Connection conn2("containerId", Endpoint(Address(), 1024), Endpoint(Address(255, 255, 255, 255), 0), L4Proto::TCP, true); + Connection connection_normalized{ + "containerId", + Endpoint{Address{}, 1024}, + Endpoint{Address{255, 255, 255, 255}, 0}, + L4Proto::TCP, + true}; // the same server connection normalized and grouped in a known subnet - Connection conn3("containerId", Endpoint(Address(), 1024), Endpoint(IPNet(Address(139, 45, 0, 0), 16), 0), L4Proto::TCP, true); - - // the connection is always ready - EXPECT_CALL(*comm, WaitForConnectionReady).WillRepeatedly(Return(true)); - // gRPC shuts down the loop, so we will want writer->Sleep to return with false - EXPECT_CALL(*comm, TryCancel).Times(1).WillOnce([&running] { running = false; }); - - /* This is what NetworkStatusNotifier calls to create streams - receive_func is a callback that we can use to simulate messages coming from the sensor - We return an object that will get called when connections and endpoints are reported */ - EXPECT_CALL(*comm, PushNetworkConnectionInfoOpenStream) - .Times(1) - .WillOnce([&sem, - &running, - &conn2, - &conn3, - &network_flows_callback](std::function receive_func) -> std::unique_ptr> { - auto duplex_writer = std::make_unique(); - network_flows_callback = receive_func; - - // the service is sending Sensor a message - EXPECT_CALL(*duplex_writer, Write) - .WillOnce([&conn2, &sem](const sensor::NetworkConnectionInfoMessage& msg, const gpr_timespec& deadline) -> Result { - // the connection reported by the scrapper is annouced as generic public - EXPECT_THAT(NetworkConnectionInfoMessageParser(msg).get_updated_connections(), UnorderedElementsAre(std::make_pair(conn2, true))); - return Result(Status::OK); - }) - .WillOnce([&conn2, &conn3, &sem](const sensor::NetworkConnectionInfoMessage& msg, const gpr_timespec& deadline) -> Result { - // after the network is declared, the connection switches to the new state - // conn3 appears and conn2 is destroyed - EXPECT_THAT(NetworkConnectionInfoMessageParser(msg).get_updated_connections(), UnorderedElementsAre(std::make_pair(conn3, true), std::make_pair(conn2, false))); - - // Done - sem.release(); - - return Result(Status::OK); - }) - .WillRepeatedly(Return(Result(Status::OK))); - - EXPECT_CALL(*duplex_writer, Sleep) - .WillOnce(ReturnPointee(&running)) // first time, we let the scrapper do its job - .WillOnce([&running, &network_flows_callback](const gpr_timespec& deadline) { - // The connection is known now, let's declare a "known network" - sensor::NetworkFlowsControlMessage msg; - unsigned char content[] = {139, 45, 0, 0, 16}; // address in network order, plus prefix length - std::string network((char*)content, sizeof(content)); - - auto ip_networks = msg.mutable_ip_networks(); - ip_networks->set_ipv4_networks(network); - - network_flows_callback(&msg); - return running; - }) - .WillRepeatedly(ReturnPointee(&running)); - - EXPECT_CALL(*duplex_writer, WaitUntilStarted).WillRepeatedly(Return(Result(Status::OK))); - - return duplex_writer; - }); + Connection connection_grouped{ + "containerId", + Endpoint{Address{}, 1024}, + Endpoint{IPNet{Address{139, 45, 0, 0}, 16}, 0}, + L4Proto::TCP, + true}; - // Connections/Endpoints returned by the scrapper (first detection of the connection) - EXPECT_CALL(*conn_scraper, Scrape).WillRepeatedly([&conn1](std::vector* connections, std::vector* listen_endpoints) -> bool { - connections->emplace_back(conn1); + // Connections/Endpoints returned by the scrapper + EXPECT_CALL(*conn_scraper, Scrape).WillRepeatedly([&connection_scrapped](std::vector* connections, std::vector* listen_endpoints) -> bool { + // this is the data which will trigger NetworkStatusNotifier to create a connection event (purpose of the test) + connections->push_back(connection_scrapped); return true; }); + EXPECT_CALL(output, SendMsg(MsgToSensorMatcher(testing::AnyOf( + UnorderedElementsAre(std::make_pair(connection_normalized, true)), + UnorderedElementsAre( + std::make_pair(connection_normalized, false), + std::make_pair(connection_grouped, true)))))) + .Times(2) + .WillOnce([this](const output::MsgToSensor& a) { + // Feed a message from Sensor to group the IP in + sensor::NetworkFlowsControlMessage msg; + std::array content{139, 45, 0, 0, 16}; // address in network order, plus prefix length + std::string network(reinterpret_cast(content.data()), content.size()); + + auto* ip_networks = msg.mutable_ip_networks(); + ip_networks->set_ipv4_networks(network); + + ch << msg; + + return SignalHandler::PROCESSED; + }) + .WillOnce([this](const output::MsgToSensor& a) { + // Done + cv.notify_one(); + return SignalHandler::PROCESSED; + }); + net_status_notifier.ReplaceConnScraper(std::move(conn_scraper)); - net_status_notifier.ReplaceComm(std::move(comm)); net_status_notifier.Start(); - // Wait for the first scrape to occur - EXPECT_TRUE(sem.try_acquire_for(std::chrono::seconds(5))); + std::unique_lock lock{mutex}; + EXPECT_EQ(cv.wait_for(lock, std::chrono::seconds(5)), std::cv_status::no_timeout); net_status_notifier.Stop(); } diff --git a/collector/test/OutputTest.cpp b/collector/test/OutputTest.cpp index 4f7a184557..b58ce56993 100644 --- a/collector/test/OutputTest.cpp +++ b/collector/test/OutputTest.cpp @@ -1,7 +1,6 @@ #include #include -#include "SignalServiceClient.h" #include "output/IClient.h" #include "output/Output.h" @@ -9,24 +8,17 @@ namespace collector::output { class MockSensorClient : public IClient { public: MOCK_METHOD(bool, Recreate, ()); - MOCK_METHOD(SignalHandler::Result, SendMsg, (const sensor::ProcessSignal&)); -}; - -class MockSignalClient : public ISignalServiceClient { - public: - MOCK_METHOD(bool, Recreate, ()); - MOCK_METHOD(SignalHandler::Result, PushSignals, (const sensor::SignalStreamMessage&)); + MOCK_METHOD(SignalHandler::Result, SendMsg, (const MsgToSensor&)); + MOCK_METHOD(bool, IsReady, ()); }; class CollectorOutputTest : public testing::Test { public: CollectorOutputTest() - : sensor_client(std::make_unique()), - signal_client(std::make_unique()) {} + : sensor_client(std::make_unique()) {} protected: std::unique_ptr sensor_client; - std::unique_ptr signal_client; }; TEST_F(CollectorOutputTest, SensorClient) { @@ -34,19 +26,7 @@ TEST_F(CollectorOutputTest, SensorClient) { EXPECT_CALL(*sensor_client, SendMsg).Times(1).WillOnce(testing::Return(SignalHandler::PROCESSED)); - Output output{std::move(sensor_client), std::move(signal_client)}; - auto result = output.SendMsg(msg); - - EXPECT_EQ(result, SignalHandler::PROCESSED); -} - -TEST_F(CollectorOutputTest, SignalClient) { - sensor::SignalStreamMessage msg; - - EXPECT_CALL(*signal_client, PushSignals).Times(1).WillOnce(testing::Return(SignalHandler::PROCESSED)); - - Output output{std::move(sensor_client), std::move(signal_client)}; - + Output output{std::move(sensor_client)}; auto result = output.SendMsg(msg); EXPECT_EQ(result, SignalHandler::PROCESSED);