Skip to content

Commit 080c664

Browse files
committed
Use the new internal service for network connections
This is achieved by replacing the NetworkConnectionInfoServiceComm component with the Output component. TODO: - Receiving messages from sensor is not fully implemented, but the mechanisms are already in place for it. - NetworkStatusNotifierTest is broken.
1 parent 7d26ea8 commit 080c664

25 files changed

+679
-621
lines changed

collector/lib/Channel.h

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#ifndef COLLECTOR_CHANNEL_H
2+
#define COLLECTOR_CHANNEL_H
3+
4+
#include <condition_variable>
5+
#include <cstddef>
6+
#include <mutex>
7+
#include <optional>
8+
#include <queue>
9+
10+
namespace collector {
11+
template <typename T>
12+
class ChannelImpl {
13+
public:
14+
ChannelImpl(std::size_t capacity = 0)
15+
: capacity_(capacity) {
16+
}
17+
18+
ChannelImpl& Send(const T& data) {
19+
std::unique_lock<std::mutex> lock{mutex_};
20+
if (capacity_ > 0 && queue_.size() >= capacity_) {
21+
cv_.wait(lock, [this] { return queue_.size() < capacity_; });
22+
}
23+
24+
queue_.push(data);
25+
cv_.notify_one();
26+
27+
return *this;
28+
}
29+
30+
ChannelImpl& Send(T&& data) {
31+
std::unique_lock<std::mutex> lock{mutex_};
32+
if (capacity_ > 0 && queue_.size() >= capacity_) {
33+
cv_.wait(lock, [this] { return queue_.size() < capacity_; });
34+
}
35+
36+
queue_.push(std::move(data));
37+
cv_.notify_one();
38+
39+
return *this;
40+
}
41+
42+
T Recv() {
43+
std::unique_lock<std::mutex> lock{mutex_};
44+
cv_.wait(lock, [this] { return queue_.size() > 0; });
45+
46+
T out = std::move(queue_.front());
47+
queue_.pop();
48+
49+
cv_.notify_one();
50+
return out;
51+
}
52+
53+
std::optional<T> TryRecv() {
54+
std::unique_lock<std::mutex> lock{mutex_};
55+
if (queue_.empty()) {
56+
return {};
57+
}
58+
59+
return Recv();
60+
}
61+
62+
bool Empty() {
63+
std::unique_lock<std::mutex> lock{mutex_};
64+
return queue_.empty();
65+
}
66+
67+
private:
68+
std::size_t capacity_;
69+
std::queue<T> queue_;
70+
std::mutex mutex_;
71+
std::condition_variable cv_;
72+
};
73+
74+
template <typename T>
75+
using Channel = std::shared_ptr<ChannelImpl<T>>;
76+
} // namespace collector
77+
78+
#endif

collector/lib/CollectorService.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlV
5252
conn_tracker_,
5353
config_,
5454
&system_inspector_,
55+
&output_,
5556
exporter_.GetRegistry().get());
5657

5758
auto network_signal_handler = std::make_unique<NetworkSignalHandler>(system_inspector_.GetInspector(), conn_tracker_, system_inspector_.GetUserspaceStats());

collector/lib/CollectorService.h

-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class CollectorService {
5353
std::shared_ptr<ConnectionTracker> conn_tracker_;
5454
std::unique_ptr<NetworkStatusNotifier> net_status_notifier_;
5555
std::shared_ptr<ProcessStore> process_store_;
56-
std::shared_ptr<NetworkConnectionInfoServiceComm> network_connection_info_service_comm_;
5756
};
5857

5958
} // namespace collector

collector/lib/NetworkConnectionInfoServiceComm.cpp

-62
This file was deleted.

collector/lib/NetworkConnectionInfoServiceComm.h

-63
This file was deleted.

collector/lib/NetworkStatusNotifier.cpp

+11-29
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,12 @@ std::vector<IPNet> readNetworks(const std::string& networks, Address::Family fam
5757
return ip_nets;
5858
}
5959

60-
void NetworkStatusNotifier::OnRecvControlMessage(const sensor::NetworkFlowsControlMessage* msg) {
61-
if (!msg) {
62-
return;
63-
}
64-
if (msg->has_ip_networks()) {
65-
ReceivePublicIPs(msg->public_ip_addresses());
60+
void NetworkStatusNotifier::OnRecvControlMessage(const sensor::NetworkFlowsControlMessage& msg) {
61+
if (msg.has_ip_networks()) {
62+
ReceivePublicIPs(msg.public_ip_addresses());
6663
}
67-
if (msg->has_ip_networks()) {
68-
ReceiveIPNetworks(msg->ip_networks());
64+
if (msg.has_ip_networks()) {
65+
ReceiveIPNetworks(msg.ip_networks());
6966
}
7067
}
7168

@@ -114,24 +111,10 @@ void NetworkStatusNotifier::Run() {
114111
auto next_attempt = std::chrono::system_clock::now();
115112

116113
while (thread_.PauseUntil(next_attempt)) {
117-
comm_->ResetClientContext();
118-
119-
if (!comm_->WaitForConnectionReady([this] { return thread_.should_stop(); })) {
120-
break;
121-
}
122-
123-
auto client_writer = comm_->PushNetworkConnectionInfoOpenStream([this](const sensor::NetworkFlowsControlMessage* msg) { OnRecvControlMessage(msg); });
124-
125-
RunSingle(client_writer.get());
114+
RunSingle();
126115
if (thread_.should_stop()) {
127116
return;
128117
}
129-
auto status = client_writer->Finish(std::chrono::seconds(5));
130-
if (status.ok()) {
131-
CLOG(ERROR) << "Error streaming network connection info: server hung up unexpectedly";
132-
} else {
133-
CLOG(ERROR) << "Error streaming network connection info: " << status.error_message();
134-
}
135118
next_attempt = std::chrono::system_clock::now() + std::chrono::seconds(10);
136119
}
137120

@@ -140,12 +123,13 @@ void NetworkStatusNotifier::Run() {
140123

141124
void NetworkStatusNotifier::Start() {
142125
thread_.Start([this] { Run(); });
126+
receiver_.Start();
143127
CLOG(INFO) << "Started network status notifier.";
144128
}
145129

146130
void NetworkStatusNotifier::Stop() {
147-
comm_->TryCancel();
148131
thread_.Stop();
132+
receiver_.Stop();
149133
}
150134

151135
void NetworkStatusNotifier::WaitUntilWriterStarted(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer, int wait_time_seconds) {
@@ -217,15 +201,13 @@ bool NetworkStatusNotifier::UpdateAllConnsAndEndpoints() {
217201
return true;
218202
}
219203

220-
void NetworkStatusNotifier::RunSingle(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer) {
221-
WaitUntilWriterStarted(writer, 10);
222-
204+
void NetworkStatusNotifier::RunSingle() {
223205
ConnMap old_conn_state;
224206
AdvertisedEndpointMap old_cep_state;
225207
auto next_scrape = std::chrono::system_clock::now();
226208
int64_t time_at_last_scrape = NowMicros();
227209

228-
while (writer->Sleep(next_scrape)) {
210+
while (thread_.PauseUntil(next_scrape)) {
229211
CLOG(DEBUG) << "Starting network status notification";
230212
next_scrape = std::chrono::system_clock::now() + std::chrono::seconds(config_.ScrapeInterval());
231213

@@ -272,7 +254,7 @@ void NetworkStatusNotifier::RunSingle(IDuplexClientWriter<sensor::NetworkConnect
272254
}
273255

274256
WITH_TIMER(CollectorStats::net_write_message) {
275-
if (!writer->Write(*msg, next_scrape)) {
257+
if (output_->SendMsg(*msg) != SignalHandler::PROCESSED) {
276258
CLOG(ERROR) << "Failed to write network connection info";
277259
return;
278260
}

0 commit comments

Comments
 (0)