Skip to content

Commit 7d26ea8

Browse files
committed
Rename and reorganize output namespace
Simplify naming for classes in the output namespace. Create dedicated namespaces for gprc and log implementations of IClient.
1 parent f78cff9 commit 7d26ea8

12 files changed

+162
-53
lines changed

collector/lib/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ file(GLOB COLLECTOR_LIB_SRC_FILES
22
${CMAKE_CURRENT_SOURCE_DIR}/*.cpp
33
${CMAKE_CURRENT_SOURCE_DIR}/system-inspector/*.cpp
44
${CMAKE_CURRENT_SOURCE_DIR}/output/*.cpp
5+
${CMAKE_CURRENT_SOURCE_DIR}/output/*/*.cpp
56
)
67

78
add_library(collector_lib ${DRIVER_HEADERS} ${COLLECTOR_LIB_SRC_FILES})

collector/lib/ProcessSignalHandler.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include "ProcessSignalFormatter.h"
77
#include "RateLimit.h"
88
#include "SignalHandler.h"
9-
#include "output/SensorClientFormatter.h"
9+
#include "output/Formatter.h"
1010
#include "system-inspector/Service.h"
1111

1212
// forward declarations
@@ -50,7 +50,7 @@ class ProcessSignalHandler : public SignalHandler {
5050

5151
output::Output* client_;
5252
ProcessSignalFormatter signal_formatter_;
53-
output::SensorClientFormatter sensor_formatter_;
53+
output::Formatter sensor_formatter_;
5454
system_inspector::Stats* stats_;
5555
RateLimitCache rate_limiter_;
5656
};

collector/lib/output/SensorClientFormatter.cpp renamed to collector/lib/output/Formatter.cpp

+15-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#include "SensorClientFormatter.h"
1+
#include "Formatter.h"
22

33
#include <uuid/uuid.h>
44

@@ -16,9 +16,9 @@
1616
namespace collector::output {
1717

1818
using SignalStreamMessage = sensor::SignalStreamMessage;
19-
using Signal = SensorClientFormatter::Signal;
20-
using ProcessSignal = SensorClientFormatter::ProcessSignal;
21-
using LineageInfo = SensorClientFormatter::LineageInfo;
19+
using Signal = Formatter::Signal;
20+
using ProcessSignal = Formatter::ProcessSignal;
21+
using LineageInfo = Formatter::LineageInfo;
2222

2323
using Timestamp = google::protobuf::Timestamp;
2424
using TimeUtil = google::protobuf::util::TimeUtil;
@@ -57,17 +57,17 @@ std::string extract_proc_args(sinsp_threadinfo* tinfo) {
5757

5858
} // namespace
5959

60-
SensorClientFormatter::SensorClientFormatter(sinsp* inspector, const CollectorConfig& config)
60+
Formatter::Formatter(sinsp* inspector, const CollectorConfig& config)
6161
: event_names_(EventNames::GetInstance()),
6262
event_extractor_(std::make_unique<system_inspector::EventExtractor>()),
6363
container_metadata_(inspector),
6464
config_(config) {
6565
event_extractor_->Init(inspector);
6666
}
6767

68-
SensorClientFormatter::~SensorClientFormatter() = default;
68+
Formatter::~Formatter() = default;
6969

70-
const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_evt* event) {
70+
const sensor::ProcessSignal* Formatter::ToProtoMessage(sinsp_evt* event) {
7171
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
7272
return nullptr;
7373
}
@@ -82,7 +82,7 @@ const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_evt* ev
8282
return CreateProcessSignal(event);
8383
}
8484

85-
const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
85+
const sensor::ProcessSignal* Formatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
8686
Reset();
8787
if (!ValidateProcessDetails(tinfo)) {
8888
CLOG(INFO) << "Dropping process event: " << tinfo;
@@ -92,7 +92,7 @@ const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_threadi
9292
return CreateProcessSignal(tinfo);
9393
}
9494

95-
ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
95+
ProcessSignal* Formatter::CreateProcessSignal(sinsp_evt* event) {
9696
auto signal = AllocateRoot();
9797

9898
// set id
@@ -174,7 +174,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
174174
return signal;
175175
}
176176

177-
ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinfo) {
177+
ProcessSignal* Formatter::CreateProcessSignal(sinsp_threadinfo* tinfo) {
178178
auto signal = AllocateRoot();
179179

180180
// set id
@@ -237,7 +237,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinf
237237
return signal;
238238
}
239239

240-
std::string SensorClientFormatter::ToString(sinsp_evt* event) {
240+
std::string Formatter::ToString(sinsp_evt* event) {
241241
std::stringstream ss;
242242
const std::string* path = event_extractor_->get_exepath(event);
243243
const std::string* name = event_extractor_->get_comm(event);
@@ -254,7 +254,7 @@ std::string SensorClientFormatter::ToString(sinsp_evt* event) {
254254
return ss.str();
255255
}
256256

257-
bool SensorClientFormatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo) {
257+
bool Formatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo) {
258258
if (tinfo == nullptr) {
259259
return false;
260260
}
@@ -266,11 +266,11 @@ bool SensorClientFormatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo
266266
return true;
267267
}
268268

269-
bool SensorClientFormatter::ValidateProcessDetails(sinsp_evt* event) {
269+
bool Formatter::ValidateProcessDetails(sinsp_evt* event) {
270270
return ValidateProcessDetails(event->get_thread_info());
271271
}
272272

273-
void SensorClientFormatter::UpdateLineageStats(const std::vector<LineageInfo>& lineage) {
273+
void Formatter::UpdateLineageStats(const std::vector<LineageInfo>& lineage) {
274274
int string_total = std::accumulate(lineage.cbegin(), lineage.cend(), 0, [](int acc, const auto& l) {
275275
return acc + l.parent_exec_file_path().size();
276276
});
@@ -281,7 +281,7 @@ void SensorClientFormatter::UpdateLineageStats(const std::vector<LineageInfo>& l
281281
COUNTER_ADD(CollectorStats::process_lineage_string_total, string_total);
282282
}
283283

284-
std::vector<LineageInfo> SensorClientFormatter::GetProcessLineage(sinsp_threadinfo* tinfo) {
284+
std::vector<LineageInfo> Formatter::GetProcessLineage(sinsp_threadinfo* tinfo) {
285285
std::vector<LineageInfo> lineage;
286286
if (tinfo == nullptr) {
287287
return lineage;

collector/lib/output/SensorClientFormatter.h renamed to collector/lib/output/Formatter.h

+7-7
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ class EventExtractor;
2121

2222
namespace collector::output {
2323

24-
class SensorClientFormatter : public ProtoSignalFormatter<sensor::ProcessSignal> {
24+
class Formatter : public ProtoSignalFormatter<sensor::ProcessSignal> {
2525
public:
26-
SensorClientFormatter(const SensorClientFormatter&) = delete;
27-
SensorClientFormatter(SensorClientFormatter&&) = delete;
28-
SensorClientFormatter& operator=(const SensorClientFormatter&) = delete;
29-
SensorClientFormatter& operator=(SensorClientFormatter&&) = delete;
30-
virtual ~SensorClientFormatter();
26+
Formatter(const Formatter&) = delete;
27+
Formatter(Formatter&&) = delete;
28+
Formatter& operator=(const Formatter&) = delete;
29+
Formatter& operator=(Formatter&&) = delete;
30+
virtual ~Formatter();
3131

32-
SensorClientFormatter(sinsp* inspector, const CollectorConfig& config);
32+
Formatter(sinsp* inspector, const CollectorConfig& config);
3333

3434
using Signal = v1::Signal;
3535
using ProcessSignal = sensor::ProcessSignal;

collector/lib/output/IClient.h

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#ifndef OUTPUT_ICLIENT_H
2+
#define OUTPUT_ICLIENT_H
3+
4+
#include "internalapi/sensor/collector_iservice.grpc.pb.h"
5+
6+
#include "SignalHandler.h"
7+
8+
namespace collector::output {
9+
10+
class IClient {
11+
public:
12+
using Service = sensor::CollectorService;
13+
14+
IClient() = default;
15+
IClient(const IClient&) = default;
16+
IClient(IClient&&) = delete;
17+
IClient& operator=(const IClient&) = default;
18+
IClient& operator=(IClient&&) = delete;
19+
virtual ~IClient() = default;
20+
21+
/**
22+
* Recreate the internal state of the object to allow communication.
23+
*
24+
* Mostly useful for handling gRPC reconnections.
25+
*
26+
* @returns true if the refresh was succesful, false otherwise.
27+
*/
28+
virtual bool Recreate() = 0;
29+
30+
/**
31+
* Send a message to sensor through the iservice.
32+
*
33+
* @param msg The message to be sent to sensor.
34+
* @returns A SignalHandler::Result with the outcome of the send
35+
* operation.
36+
*/
37+
virtual SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) = 0;
38+
};
39+
40+
} // namespace collector::output
41+
42+
#endif // OUTPUT_ICLIENT_H

collector/lib/output/Output.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#include "Output.h"
22

33
#include "GRPCUtil.h"
4-
#include "HostInfo.h"
4+
#include "output/grpc/Client.h"
5+
#include "output/log/Client.h"
56

67
namespace collector::output {
78

@@ -11,7 +12,7 @@ Output::Output(const CollectorConfig& config)
1112
channel_ = config.grpc_channel;
1213

1314
if (use_sensor_client_) {
14-
auto sensor_client = std::make_unique<SensorClient>(channel_);
15+
auto sensor_client = std::make_unique<grpc::Client>(channel_);
1516
sensor_clients_.emplace_back(std::move(sensor_client));
1617
} else {
1718
auto signal_client = std::make_unique<SignalServiceClient>(channel_);
@@ -21,7 +22,7 @@ Output::Output(const CollectorConfig& config)
2122

2223
if (config.grpc_channel == nullptr || config.UseStdout()) {
2324
if (use_sensor_client_) {
24-
auto sensor_client = std::make_unique<SensorClientStdout>();
25+
auto sensor_client = std::make_unique<log::Client>();
2526
sensor_clients_.emplace_back(std::move(sensor_client));
2627
} else {
2728
auto signal_client = std::make_unique<StdoutSignalServiceClient>();

collector/lib/output/Output.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include "internalapi/sensor/signal_iservice.pb.h"
66

77
#include "CollectorConfig.h"
8-
#include "SensorClient.h"
8+
#include "IClient.h"
99
#include "SignalHandler.h"
1010
#include "SignalServiceClient.h"
1111
#include "StoppableThread.h"
@@ -31,7 +31,7 @@ class Output {
3131
}
3232

3333
// Constructor for tests
34-
Output(std::unique_ptr<ISensorClient>&& sensor_client,
34+
Output(std::unique_ptr<IClient>&& sensor_client,
3535
std::unique_ptr<ISignalServiceClient>&& signal_client) {
3636
sensor_clients_.emplace_back(std::move(sensor_client));
3737
signal_clients_.emplace_back(std::move(signal_client));
@@ -64,7 +64,7 @@ class Output {
6464
SignalHandler::Result SensorOutput(const sensor::ProcessSignal& msg);
6565
SignalHandler::Result SignalOutput(const sensor::SignalStreamMessage& msg);
6666

67-
std::vector<std::unique_ptr<ISensorClient>> sensor_clients_;
67+
std::vector<std::unique_ptr<IClient>> sensor_clients_;
6868
std::vector<std::unique_ptr<ISignalServiceClient>> signal_clients_;
6969

7070
bool use_sensor_client_ = true;

collector/lib/output/SensorClient.cpp renamed to collector/lib/output/grpc/Client.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
#include "SensorClient.h"
1+
#include "Client.h"
22

33
#include "Logging.h"
44

5-
namespace collector::output {
6-
bool SensorClient::Recreate() {
7-
context_ = std::make_unique<grpc::ClientContext>();
5+
namespace collector::output::grpc {
6+
bool Client::Recreate() {
7+
context_ = std::make_unique<::grpc::ClientContext>();
88
writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncPushProcesses, channel_, context_.get());
99
if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) {
1010
CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ...";
@@ -18,7 +18,7 @@ bool SensorClient::Recreate() {
1818
return true;
1919
}
2020

21-
SignalHandler::Result SensorClient::SendMsg(const sensor::ProcessSignal& msg) {
21+
SignalHandler::Result Client::SendMsg(const sensor::ProcessSignal& msg) {
2222
if (!stream_active_.load(std::memory_order_acquire)) {
2323
CLOG_THROTTLED(ERROR, std::chrono::seconds(10))
2424
<< "GRPC stream is not established";
@@ -44,4 +44,4 @@ SignalHandler::Result SensorClient::SendMsg(const sensor::ProcessSignal& msg) {
4444

4545
return SignalHandler::PROCESSED;
4646
}
47-
} // namespace collector::output
47+
} // namespace collector::output::grpc

collector/lib/output/grpc/Client.h

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#ifndef OUTPUT_GRPC_CLIENT
2+
#define OUTPUT_GRPC_CLIENT
3+
4+
#include <grpcpp/channel.h>
5+
6+
#include "DuplexGRPC.h"
7+
#include "output/IClient.h"
8+
9+
namespace collector::output::grpc {
10+
11+
using Channel = ::grpc::Channel;
12+
13+
class Client : public IClient {
14+
public:
15+
using Service = sensor::CollectorService;
16+
17+
Client(const Client&) = delete;
18+
Client(Client&&) = delete;
19+
Client& operator=(const Client&) = delete;
20+
Client& operator=(Client&&) = delete;
21+
~Client() override {
22+
context_->TryCancel();
23+
}
24+
25+
explicit Client(std::shared_ptr<Channel> channel)
26+
: channel_(std::move(channel)) {
27+
}
28+
29+
bool Recreate() override;
30+
31+
SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override;
32+
33+
private:
34+
std::shared_ptr<Channel> channel_;
35+
36+
std::atomic<bool> stream_active_ = false;
37+
38+
// This needs to have the same lifetime as the class.
39+
std::unique_ptr<::grpc::ClientContext> context_;
40+
std::unique_ptr<IDuplexClientWriter<sensor::ProcessSignal>> writer_;
41+
42+
bool first_write_ = false;
43+
};
44+
} // namespace collector::output::grpc
45+
46+
#endif

collector/lib/output/log/Client.h

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#ifndef OUTPUT_LOG_CLIENT_H
2+
#define OUTPUT_LOG_CLIENT_H
3+
4+
#include <google/protobuf/message.h>
5+
6+
#include "Utility.h"
7+
#include "output/IClient.h"
8+
9+
namespace collector::output::log {
10+
class Client : public IClient {
11+
bool Recreate() override { return true; }
12+
13+
SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override {
14+
LogProtobufMessage(msg);
15+
return SignalHandler::PROCESSED;
16+
}
17+
};
18+
} // namespace collector::output::log
19+
#endif

0 commit comments

Comments
 (0)