Skip to content

Commit 5770ec0

Browse files
committed
Basic implementation using the new collector internal service
1 parent 5218c15 commit 5770ec0

16 files changed

+226
-68
lines changed

collector/collector.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <sys/wait.h>
1313

1414
#include "ConfigLoader.h"
15+
#include "SensorClient.h"
1516

1617
extern "C" {
1718
#include <assert.h>

collector/lib/CollectorService.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,13 @@ void CollectorService::RunForever() {
119119
server.addHandler(collector_config_inspector->kBaseRoute, collector_config_inspector.get());
120120
}
121121

122-
system_inspector_.Init(config_, conn_tracker);
122+
if (config_.grpc_channel != nullptr) {
123+
client_ = std::make_unique<SensorClient>(config_.grpc_channel);
124+
} else {
125+
client_ = std::make_unique<SensorClientStdout>();
126+
}
127+
128+
system_inspector_.Init(config_, conn_tracker, client_.get());
123129
system_inspector_.Start();
124130

125131
ControlValue cv;

collector/lib/CollectorService.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class CollectorService {
2424
const std::atomic<int>& signum_;
2525

2626
system_inspector::Service system_inspector_;
27+
std::unique_ptr<ISensorClient> client_;
2728
};
2829

2930
bool SetupKernelDriver(CollectorService& collector, const CollectorConfig& config);

collector/lib/ProcessSignalFormatter.cpp

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <google/protobuf/util/time_util.h>
66

7+
#include "internalapi/sensor/collector_iservice.pb.h"
78
#include "internalapi/sensor/signal_iservice.pb.h"
89

910
#include "CollectorStats.h"
@@ -65,9 +66,9 @@ ProcessSignalFormatter::ProcessSignalFormatter(
6566
event_extractor_->Init(inspector);
6667
}
6768

68-
ProcessSignalFormatter::~ProcessSignalFormatter() {}
69+
ProcessSignalFormatter::~ProcessSignalFormatter() = default;
6970

70-
const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* event) {
71+
const sensor::MsgFromCollector* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* event) {
7172
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
7273
return nullptr;
7374
}
@@ -80,38 +81,34 @@ const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* eve
8081
}
8182

8283
ProcessSignal* process_signal = CreateProcessSignal(event);
83-
if (!process_signal) {
84+
if (process_signal == nullptr) {
8485
return nullptr;
8586
}
8687

87-
Signal* signal = Allocate<Signal>();
88-
signal->set_allocated_process_signal(process_signal);
89-
90-
SignalStreamMessage* signal_stream_message = AllocateRoot();
91-
signal_stream_message->clear_collector_register_request();
92-
signal_stream_message->set_allocated_signal(signal);
93-
return signal_stream_message;
88+
auto* msg = AllocateRoot();
89+
msg->clear_info();
90+
msg->clear_register_();
91+
msg->set_allocated_process_signal(process_signal);
92+
return msg;
9493
}
9594

96-
const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
95+
const sensor::MsgFromCollector* ProcessSignalFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
9796
Reset();
9897
if (!ValidateProcessDetails(tinfo)) {
9998
CLOG(INFO) << "Dropping process event: " << tinfo;
10099
return nullptr;
101100
}
102101

103-
ProcessSignal* process_signal = CreateProcessSignal(tinfo);
104-
if (!process_signal) {
102+
ProcessSignal* signal = CreateProcessSignal(tinfo);
103+
if (signal == nullptr) {
105104
return nullptr;
106105
}
107106

108-
Signal* signal = Allocate<Signal>();
109-
signal->set_allocated_process_signal(process_signal);
110-
111-
SignalStreamMessage* signal_stream_message = AllocateRoot();
112-
signal_stream_message->clear_collector_register_request();
113-
signal_stream_message->set_allocated_signal(signal);
114-
return signal_stream_message;
107+
auto* msg = AllocateRoot();
108+
msg->clear_register_();
109+
msg->clear_info();
110+
msg->set_allocated_process_signal(signal);
111+
return msg;
115112
}
116113

117114
ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_evt* event) {
@@ -173,7 +170,7 @@ ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_evt* event) {
173170
// set time
174171
auto timestamp = Allocate<Timestamp>();
175172
*timestamp = TimeUtil::NanosecondsToTimestamp(event->get_ts());
176-
signal->set_allocated_time(timestamp);
173+
signal->set_allocated_creation_time(timestamp);
177174

178175
// set container_id
179176
if (const std::string* container_id = event_extractor_->get_container_id(event)) {
@@ -238,7 +235,7 @@ ProcessSignal* ProcessSignalFormatter::CreateProcessSignal(sinsp_threadinfo* tin
238235
// set time
239236
auto timestamp = Allocate<Timestamp>();
240237
*timestamp = TimeUtil::NanosecondsToTimestamp(tinfo->m_clone_ts);
241-
signal->set_allocated_time(timestamp);
238+
signal->set_allocated_creation_time(timestamp);
242239

243240
// set container_id
244241
signal->set_container_id(tinfo->m_container_id);
@@ -315,20 +312,20 @@ void ProcessSignalFormatter::CountLineage(const std::vector<LineageInfo>& lineag
315312

316313
void ProcessSignalFormatter::GetProcessLineage(sinsp_threadinfo* tinfo,
317314
std::vector<LineageInfo>& lineage) {
318-
if (tinfo == NULL) {
315+
if (tinfo == nullptr) {
319316
return;
320317
}
321-
sinsp_threadinfo* mt = NULL;
318+
sinsp_threadinfo* mt = nullptr;
322319
if (tinfo->is_main_thread()) {
323320
mt = tinfo;
324321
} else {
325322
mt = tinfo->get_main_thread();
326-
if (mt == NULL) {
323+
if (mt == nullptr) {
327324
return;
328325
}
329326
}
330-
sinsp_threadinfo::visitor_func_t visitor = [this, &lineage](sinsp_threadinfo* pt) {
331-
if (pt == NULL) {
327+
sinsp_threadinfo::visitor_func_t visitor = [&lineage](sinsp_threadinfo* pt) {
328+
if (pt == nullptr) {
332329
return false;
333330
}
334331
if (pt->m_pid == 0) {

collector/lib/ProcessSignalFormatter.h

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,35 @@
66
#include <gtest/gtest_prod.h>
77

88
#include "api/v1/signal.pb.h"
9-
#include "internalapi/sensor/signal_iservice.pb.h"
10-
#include "storage/process_indicator.pb.h"
9+
#include "internalapi/sensor/collector_iservice.pb.h"
1110

1211
#include "CollectorConfig.h"
13-
#include "CollectorStats.h"
1412
#include "ContainerMetadata.h"
1513
#include "EventNames.h"
1614
#include "ProtoSignalFormatter.h"
1715

1816
// forward definitions
1917
class sinsp;
2018
class sinsp_threadinfo;
21-
namespace collector {
22-
namespace system_inspector {
19+
20+
namespace collector::system_inspector {
2321
class EventExtractor;
2422
}
25-
} // namespace collector
2623

2724
namespace collector {
2825

29-
class ProcessSignalFormatter : public ProtoSignalFormatter<sensor::SignalStreamMessage> {
26+
class ProcessSignalFormatter : public ProtoSignalFormatter<sensor::MsgFromCollector> {
3027
public:
3128
ProcessSignalFormatter(sinsp* inspector, const CollectorConfig& config);
3229
~ProcessSignalFormatter();
3330

3431
using Signal = v1::Signal;
35-
using ProcessSignal = storage::ProcessSignal;
36-
using LineageInfo = storage::ProcessSignal_LineageInfo;
32+
using ProcessSignal = sensor::ProcessSignal;
33+
using LineageInfo = sensor::ProcessSignal_LineageInfo;
34+
using MsgFromCollector = sensor::MsgFromCollector;
3735

38-
const sensor::SignalStreamMessage* ToProtoMessage(sinsp_evt* event) override;
39-
const sensor::SignalStreamMessage* ToProtoMessage(sinsp_threadinfo* tinfo);
36+
const MsgFromCollector* ToProtoMessage(sinsp_evt* event) override;
37+
const MsgFromCollector* ToProtoMessage(sinsp_threadinfo* tinfo);
4038

4139
void GetProcessLineage(sinsp_threadinfo* tinfo, std::vector<LineageInfo>& lineage);
4240

collector/lib/ProcessSignalHandler.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
namespace collector {
1515

16-
std::string compute_process_key(const ::storage::ProcessSignal& s) {
16+
std::string compute_process_key(const ::sensor::ProcessSignal& s) {
1717
std::stringstream ss;
1818
ss << s.container_id() << " " << s.name() << " ";
1919
if (s.args().length() <= 256) {
@@ -39,21 +39,21 @@ bool ProcessSignalHandler::Stop() {
3939
SignalHandler::Result ProcessSignalHandler::HandleSignal(sinsp_evt* evt) {
4040
const auto* signal_msg = formatter_.ToProtoMessage(evt);
4141

42-
if (!signal_msg) {
42+
if (signal_msg == nullptr) {
4343
++(stats_->nProcessResolutionFailuresByEvt);
4444
return IGNORED;
4545
}
4646

47-
const char* name = signal_msg->signal().process_signal().name().c_str();
48-
const int pid = signal_msg->signal().process_signal().pid();
47+
const char* name = signal_msg->process_signal().name().c_str();
48+
const uint32_t pid = signal_msg->process_signal().pid();
4949
DTRACE_PROBE2(collector, process_signal_handler, name, pid);
5050

51-
if (!rate_limiter_.Allow(compute_process_key(signal_msg->signal().process_signal()))) {
51+
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
5252
++(stats_->nProcessRateLimitCount);
5353
return IGNORED;
5454
}
5555

56-
auto result = client_->PushSignals(*signal_msg);
56+
auto result = client_->SendMsg(*signal_msg);
5757
if (result == SignalHandler::PROCESSED) {
5858
++(stats_->nProcessSent);
5959
} else if (result == SignalHandler::ERROR) {
@@ -65,17 +65,17 @@ SignalHandler::Result ProcessSignalHandler::HandleSignal(sinsp_evt* evt) {
6565

6666
SignalHandler::Result ProcessSignalHandler::HandleExistingProcess(sinsp_threadinfo* tinfo) {
6767
const auto* signal_msg = formatter_.ToProtoMessage(tinfo);
68-
if (!signal_msg) {
68+
if (signal_msg == nullptr) {
6969
++(stats_->nProcessResolutionFailuresByTinfo);
7070
return IGNORED;
7171
}
7272

73-
if (!rate_limiter_.Allow(compute_process_key(signal_msg->signal().process_signal()))) {
73+
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
7474
++(stats_->nProcessRateLimitCount);
7575
return IGNORED;
7676
}
7777

78-
auto result = client_->PushSignals(*signal_msg);
78+
auto result = client_->SendMsg(*signal_msg);
7979
if (result == SignalHandler::PROCESSED) {
8080
++(stats_->nProcessSent);
8181
} else if (result == SignalHandler::ERROR) {

collector/lib/ProcessSignalHandler.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "CollectorConfig.h"
99
#include "ProcessSignalFormatter.h"
1010
#include "RateLimit.h"
11+
#include "SensorClient.h"
1112
#include "SignalHandler.h"
1213
#include "system-inspector/Service.h"
1314

@@ -22,7 +23,7 @@ class ProcessSignalHandler : public SignalHandler {
2223
public:
2324
ProcessSignalHandler(
2425
sinsp* inspector,
25-
ISignalServiceClient* client,
26+
ISensorClient* client,
2627
system_inspector::Stats* stats,
2728
const CollectorConfig& config)
2829
: client_(client),
@@ -38,7 +39,7 @@ class ProcessSignalHandler : public SignalHandler {
3839
std::vector<std::string> GetRelevantEvents() override;
3940

4041
private:
41-
ISignalServiceClient* client_;
42+
ISensorClient* client_;
4243
ProcessSignalFormatter formatter_;
4344
system_inspector::Stats* stats_;
4445
RateLimitCache rate_limiter_;

collector/lib/SensorClient.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#include "SensorClient.h"
2+
3+
#include "GRPCUtil.h"
4+
#include "Logging.h"
5+
6+
namespace collector {
7+
bool SensorClient::EstablishGRPCStreamSingle() {
8+
std::mutex mtx;
9+
std::unique_lock<std::mutex> lock(mtx);
10+
stream_interrupted_.wait(lock, [this]() { return !stream_active_.load(std::memory_order_acquire) || thread_.should_stop(); });
11+
if (thread_.should_stop()) {
12+
return false;
13+
}
14+
15+
CLOG(INFO) << "Trying to establish GRPC stream for signals ...";
16+
17+
if (!WaitForChannelReady(channel_, [this]() { return thread_.should_stop(); })) {
18+
return false;
19+
}
20+
if (thread_.should_stop()) {
21+
return false;
22+
}
23+
24+
// stream writer
25+
context_ = std::make_unique<grpc::ClientContext>();
26+
writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncCommunicate, channel_, context_.get());
27+
if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) {
28+
CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ...";
29+
CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message();
30+
writer_.reset();
31+
return true;
32+
}
33+
CLOG(INFO) << "Successfully established GRPC stream for signals.";
34+
35+
first_write_ = true;
36+
stream_active_.store(true, std::memory_order_release);
37+
return true;
38+
}
39+
40+
void SensorClient::EstablishGRPCStream() {
41+
while (EstablishGRPCStreamSingle());
42+
CLOG(INFO) << "Signal service client terminating.";
43+
}
44+
45+
void SensorClient::Start() {
46+
thread_.Start([this] { EstablishGRPCStream(); });
47+
}
48+
49+
void SensorClient::Stop() {
50+
stream_interrupted_.notify_one();
51+
thread_.Stop();
52+
context_->TryCancel();
53+
context_.reset();
54+
}
55+
56+
SignalHandler::Result SensorClient::SendMsg(const sensor::MsgFromCollector& msg) {
57+
if (!stream_active_.load(std::memory_order_acquire)) {
58+
CLOG_THROTTLED(ERROR, std::chrono::seconds(10))
59+
<< "GRPC stream is not established";
60+
return SignalHandler::ERROR;
61+
}
62+
63+
if (first_write_) {
64+
first_write_ = false;
65+
return SignalHandler::NEEDS_REFRESH;
66+
}
67+
68+
if (!writer_->Write(msg)) {
69+
auto status = writer_->FinishNow();
70+
if (!status.ok()) {
71+
CLOG(ERROR) << "GRPC writes failed: (" << status.error_code() << ") " << status.error_message();
72+
}
73+
writer_.reset();
74+
75+
stream_active_.store(false, std::memory_order_release);
76+
CLOG(ERROR) << "GRPC stream interrupted";
77+
stream_interrupted_.notify_one();
78+
return SignalHandler::ERROR;
79+
}
80+
81+
return SignalHandler::PROCESSED;
82+
}
83+
} // namespace collector

0 commit comments

Comments
 (0)