1
- #include " CollectorOutput .h"
1
+ #include " Output .h"
2
2
3
3
#include " GRPCUtil.h"
4
4
#include " HostInfo.h"
5
5
6
- namespace collector {
6
+ namespace collector ::output {
7
7
8
- CollectorOutput::CollectorOutput (const CollectorConfig& config)
8
+ Output::Output (const CollectorConfig& config)
9
9
: use_sensor_client_(!config.UseLegacyServices()) {
10
10
if (config.grpc_channel != nullptr ) {
11
11
channel_ = config.grpc_channel ;
@@ -32,13 +32,13 @@ CollectorOutput::CollectorOutput(const CollectorConfig& config)
32
32
thread_.Start ([this ] { EstablishGrpcStream (); });
33
33
}
34
34
35
- void CollectorOutput ::HandleOutputError () {
35
+ void Output ::HandleOutputError () {
36
36
CLOG (ERROR) << " GRPC stream interrupted" ;
37
37
stream_active_.store (false , std::memory_order_release);
38
38
stream_interrupted_.notify_one ();
39
39
}
40
40
41
- SignalHandler::Result CollectorOutput ::SensorOutput (const sensor::ProcessSignal& msg) {
41
+ SignalHandler::Result Output ::SensorOutput (const sensor::ProcessSignal& msg) {
42
42
for (auto & client : sensor_clients_) {
43
43
auto res = client->SendMsg (msg);
44
44
switch (res) {
@@ -58,7 +58,7 @@ SignalHandler::Result CollectorOutput::SensorOutput(const sensor::ProcessSignal&
58
58
return SignalHandler::PROCESSED;
59
59
}
60
60
61
- SignalHandler::Result CollectorOutput ::SignalOutput (const sensor::SignalStreamMessage& msg) {
61
+ SignalHandler::Result Output ::SignalOutput (const sensor::SignalStreamMessage& msg) {
62
62
for (auto & client : signal_clients_) {
63
63
auto res = client->PushSignals (msg);
64
64
switch (res) {
@@ -78,7 +78,7 @@ SignalHandler::Result CollectorOutput::SignalOutput(const sensor::SignalStreamMe
78
78
return SignalHandler::PROCESSED;
79
79
}
80
80
81
- SignalHandler::Result CollectorOutput ::SendMsg (const MessageType& msg) {
81
+ SignalHandler::Result Output ::SendMsg (const MessageType& msg) {
82
82
auto visitor = [this ](auto && m) {
83
83
using T = std::decay_t <decltype (m)>;
84
84
if constexpr (std::is_same_v<T, sensor::ProcessSignal>) {
@@ -94,13 +94,13 @@ SignalHandler::Result CollectorOutput::SendMsg(const MessageType& msg) {
94
94
return std::visit (visitor, msg);
95
95
}
96
96
97
- void CollectorOutput ::EstablishGrpcStream () {
97
+ void Output ::EstablishGrpcStream () {
98
98
while (EstablishGrpcStreamSingle ()) {
99
99
}
100
100
CLOG (INFO) << " Service client terminating." ;
101
101
}
102
102
103
- bool CollectorOutput ::EstablishGrpcStreamSingle () {
103
+ bool Output ::EstablishGrpcStreamSingle () {
104
104
std::mutex mtx;
105
105
std::unique_lock<std::mutex> lock (mtx);
106
106
stream_interrupted_.wait (lock, [this ]() { return !stream_active_.load (std::memory_order_acquire) || thread_.should_stop (); });
@@ -135,4 +135,4 @@ bool CollectorOutput::EstablishGrpcStreamSingle() {
135
135
}
136
136
return true ;
137
137
}
138
- } // namespace collector
138
+ } // namespace collector::output
0 commit comments