Skip to content

Commit 43ff59e

Browse files
committed
Use the new internal service for network connections
This is achieved by replacing the NetworkConnectionInfoServiceComm component with the Output component. TODO: - Receiving messages from sensor is not fully implemented, but the mechanisms are already in place for it. - NetworkStatusNotifierTest is broken.
1 parent 7d26ea8 commit 43ff59e

24 files changed

+575
-528
lines changed

collector/lib/Channel.h

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#ifndef COLLECTOR_CHANNEL_H
2+
#define COLLECTOR_CHANNEL_H
3+
4+
#include <condition_variable>
5+
#include <cstddef>
6+
#include <mutex>
7+
#include <queue>
8+
9+
namespace collector {
10+
template <typename T>
11+
class Channel {
12+
public:
13+
Channel(std::size_t capacity = 0)
14+
: capacity_(capacity) {
15+
}
16+
17+
friend Channel<T>& operator<<(Channel<T>& ch, const T& data) {
18+
std::unique_lock<std::mutex> lock{ch.mutex_};
19+
if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) {
20+
ch.cv_.wait(lock, [&ch]() { return ch.queue_.size() < ch.capacity_; });
21+
}
22+
23+
ch.queue_.push(data);
24+
ch.cv_.notify_one();
25+
return ch;
26+
}
27+
28+
friend Channel<T>& operator<<(Channel<T>& ch, T&& data) {
29+
std::unique_lock<std::mutex> lock{ch.mutex_};
30+
if (ch.capacity_ > 0 && ch.queue_.size() >= ch.capacity_) {
31+
ch.cv_.wait(lock, [&ch]() { return ch.queue_.size() < ch.capacity_; });
32+
}
33+
34+
ch.queue_.emplace(std::move(data));
35+
ch.cv_.notify_one();
36+
return ch;
37+
}
38+
39+
friend Channel<T>& operator>>(Channel<T>& ch, T& out) {
40+
std::unique_lock<std::mutex> lock{ch.mutex_};
41+
ch.cv_.wait(lock, [&ch]() { return ch.queue_.size() > 0; });
42+
43+
out = std::move(ch.queue_.front());
44+
ch.queue_.pop();
45+
46+
ch.cv_.notify_one();
47+
return ch;
48+
}
49+
50+
private:
51+
std::size_t capacity_;
52+
std::queue<T> queue_;
53+
std::mutex mutex_;
54+
std::condition_variable cv_;
55+
};
56+
} // namespace collector
57+
58+
#endif

collector/lib/CollectorService.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
5252
conn_tracker_,
5353
config_,
5454
&system_inspector_,
55+
&output_,
5556
exporter_.GetRegistry().get());
5657

5758
auto network_signal_handler = std::make_unique<NetworkSignalHandler>(system_inspector_.GetInspector(), conn_tracker_, system_inspector_.GetUserspaceStats());

collector/lib/CollectorService.h

-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class CollectorService {
5353
std::shared_ptr<ConnectionTracker> conn_tracker_;
5454
std::unique_ptr<NetworkStatusNotifier> net_status_notifier_;
5555
std::shared_ptr<ProcessStore> process_store_;
56-
std::shared_ptr<NetworkConnectionInfoServiceComm> network_connection_info_service_comm_;
5756
};
5857

5958
} // namespace collector

collector/lib/NetworkConnectionInfoServiceComm.cpp

-62
This file was deleted.

collector/lib/NetworkConnectionInfoServiceComm.h

-63
This file was deleted.

collector/lib/NetworkStatusNotifier.cpp

+4-21
Original file line numberDiff line numberDiff line change
@@ -114,24 +114,10 @@ void NetworkStatusNotifier::Run() {
114114
auto next_attempt = std::chrono::system_clock::now();
115115

116116
while (thread_.PauseUntil(next_attempt)) {
117-
comm_->ResetClientContext();
118-
119-
if (!comm_->WaitForConnectionReady([this] { return thread_.should_stop(); })) {
120-
break;
121-
}
122-
123-
auto client_writer = comm_->PushNetworkConnectionInfoOpenStream([this](const sensor::NetworkFlowsControlMessage* msg) { OnRecvControlMessage(msg); });
124-
125-
RunSingle(client_writer.get());
117+
RunSingle();
126118
if (thread_.should_stop()) {
127119
return;
128120
}
129-
auto status = client_writer->Finish(std::chrono::seconds(5));
130-
if (status.ok()) {
131-
CLOG(ERROR) << "Error streaming network connection info: server hung up unexpectedly";
132-
} else {
133-
CLOG(ERROR) << "Error streaming network connection info: " << status.error_message();
134-
}
135121
next_attempt = std::chrono::system_clock::now() + std::chrono::seconds(10);
136122
}
137123

@@ -144,7 +130,6 @@ void NetworkStatusNotifier::Start() {
144130
}
145131

146132
void NetworkStatusNotifier::Stop() {
147-
comm_->TryCancel();
148133
thread_.Stop();
149134
}
150135

@@ -217,15 +202,13 @@ bool NetworkStatusNotifier::UpdateAllConnsAndEndpoints() {
217202
return true;
218203
}
219204

220-
void NetworkStatusNotifier::RunSingle(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer) {
221-
WaitUntilWriterStarted(writer, 10);
222-
205+
void NetworkStatusNotifier::RunSingle() {
223206
ConnMap old_conn_state;
224207
AdvertisedEndpointMap old_cep_state;
225208
auto next_scrape = std::chrono::system_clock::now();
226209
int64_t time_at_last_scrape = NowMicros();
227210

228-
while (writer->Sleep(next_scrape)) {
211+
while (thread_.PauseUntil(next_scrape)) {
229212
CLOG(DEBUG) << "Starting network status notification";
230213
next_scrape = std::chrono::system_clock::now() + std::chrono::seconds(config_.ScrapeInterval());
231214

@@ -272,7 +255,7 @@ void NetworkStatusNotifier::RunSingle(IDuplexClientWriter<sensor::NetworkConnect
272255
}
273256

274257
WITH_TIMER(CollectorStats::net_write_message) {
275-
if (!writer->Write(*msg, next_scrape)) {
258+
if (output_->SendMsg(*msg) != SignalHandler::PROCESSED) {
276259
CLOG(ERROR) << "Failed to write network connection info";
277260
return;
278261
}

collector/lib/NetworkStatusNotifier.h

+6-16
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
#include "CollectorConfig.h"
99
#include "CollectorConnectionStats.h"
1010
#include "ConnTracker.h"
11-
#include "NetworkConnectionInfoServiceComm.h"
11+
#include "DuplexGRPC.h"
1212
#include "ProcfsScraper.h"
1313
#include "ProtoAllocator.h"
1414
#include "StoppableThread.h"
15+
#include "output/Output.h"
1516

1617
namespace collector {
1718

@@ -20,11 +21,12 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
2021
NetworkStatusNotifier(std::shared_ptr<ConnectionTracker> conn_tracker,
2122
const CollectorConfig& config,
2223
system_inspector::Service* inspector,
24+
output::Output* output,
2325
prometheus::Registry* registry)
2426
: conn_scraper_(std::make_unique<ConnScraper>(config, inspector)),
2527
conn_tracker_(std::move(conn_tracker)),
2628
config_(config),
27-
comm_(std::make_unique<NetworkConnectionInfoServiceComm>(config.grpc_channel)) {
29+
output_(output) {
2830
if (config_.EnableConnectionStats()) {
2931
connections_total_reporter_ = {{registry,
3032
"rox_connections_total",
@@ -55,18 +57,6 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
5557
conn_scraper_ = std::move(cs);
5658
}
5759

58-
/**
59-
* Replace the communications object.
60-
*
61-
* This is meant to make testing easier by swapping in a mock object.
62-
*
63-
* @params comm A unique pointer to the new instance of communications
64-
* to use.
65-
*/
66-
void ReplaceComm(std::unique_ptr<INetworkConnectionInfoServiceComm>&& comm) {
67-
comm_ = std::move(comm);
68-
}
69-
7060
private:
7161
FRIEND_TEST(NetworkStatusNotifierTest, RateLimitedConnections);
7262

@@ -84,7 +74,7 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
8474
void Run();
8575
void WaitUntilWriterStarted(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer, int wait_time);
8676
bool UpdateAllConnsAndEndpoints();
87-
void RunSingle(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer);
77+
void RunSingle();
8878
void ReceivePublicIPs(const sensor::IPAddressList& public_ips);
8979
void ReceiveIPNetworks(const sensor::IPNetworkList& networks);
9080

@@ -96,7 +86,7 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
9686
std::shared_ptr<ConnectionTracker> conn_tracker_;
9787

9888
const CollectorConfig& config_;
99-
std::unique_ptr<INetworkConnectionInfoServiceComm> comm_;
89+
output::Output* output_;
10090

10191
std::optional<CollectorConnectionStats<unsigned int>> connections_total_reporter_;
10292
std::optional<CollectorConnectionStats<float>> connections_rate_reporter_;

collector/lib/SignalServiceClient.cpp

-53
This file was deleted.

0 commit comments

Comments
 (0)