Skip to content

Commit 51fdfaa

Browse files
committed
ROX-28981: Split methods for collector iservice
Due to a limitation on collector side with the C++ gRPC API, we have decided to split the RPC method into one per message type collector can send. This patch changes and regenerates the protobuf definitions and reworks the collector service to comply with the new definition.
1 parent 81a7b0c commit 51fdfaa

13 files changed

+145
-137
lines changed

collector/lib/CollectorOutput.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#include "CollectorOutput.h"
22

3-
#include "internalapi/sensor/collector_iservice.pb.h"
4-
53
#include "GRPCUtil.h"
64
#include "HostInfo.h"
75

@@ -40,7 +38,7 @@ void CollectorOutput::HandleOutputError() {
4038
stream_interrupted_.notify_one();
4139
}
4240

43-
SignalHandler::Result CollectorOutput::SensorOutput(const sensor::MsgFromCollector& msg) {
41+
SignalHandler::Result CollectorOutput::SensorOutput(const sensor::ProcessSignal& msg) {
4442
for (auto& client : sensor_clients_) {
4543
auto res = client->SendMsg(msg);
4644
switch (res) {
@@ -83,7 +81,7 @@ SignalHandler::Result CollectorOutput::SignalOutput(const sensor::SignalStreamMe
8381
SignalHandler::Result CollectorOutput::SendMsg(const MessageType& msg) {
8482
auto visitor = [this](auto&& m) {
8583
using T = std::decay_t<decltype(m)>;
86-
if constexpr (std::is_same_v<T, sensor::MsgFromCollector>) {
84+
if constexpr (std::is_same_v<T, sensor::ProcessSignal>) {
8785
return SensorOutput(m);
8886
} else if constexpr (std::is_same_v<T, sensor::SignalStreamMessage>) {
8987
return SignalOutput(m);

collector/lib/CollectorOutput.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include <variant>
44

5-
#include "internalapi/sensor/collector_iservice.pb.h"
65
#include "internalapi/sensor/signal_iservice.pb.h"
76

87
#include "CollectorConfig.h"
@@ -13,7 +12,7 @@
1312

1413
namespace collector {
1514

16-
using MessageType = std::variant<sensor::MsgFromCollector, sensor::SignalStreamMessage>;
15+
using MessageType = std::variant<sensor::ProcessSignal, sensor::SignalStreamMessage>;
1716

1817
class CollectorOutput {
1918
public:
@@ -62,7 +61,7 @@ class CollectorOutput {
6261
bool EstablishGrpcStreamSingle();
6362

6463
void HandleOutputError();
65-
SignalHandler::Result SensorOutput(const sensor::MsgFromCollector& msg);
64+
SignalHandler::Result SensorOutput(const sensor::ProcessSignal& msg);
6665
SignalHandler::Result SignalOutput(const sensor::SignalStreamMessage& msg);
6766

6867
std::vector<std::unique_ptr<ISensorClient>> sensor_clients_;

collector/lib/ProcessSignalHandler.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ SignalHandler::Result ProcessSignalHandler::HandleSensorSignal(sinsp_evt* evt) {
104104
return IGNORED;
105105
}
106106

107-
dtrace_probe(signal_msg->process_signal());
107+
dtrace_probe(*signal_msg);
108108

109-
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
109+
if (!rate_limiter_.Allow(compute_process_key(*signal_msg))) {
110110
++(stats_->nProcessRateLimitCount);
111111
return IGNORED;
112112
}
@@ -128,7 +128,7 @@ SignalHandler::Result ProcessSignalHandler::HandleExistingProcessSensor(sinsp_th
128128
return IGNORED;
129129
}
130130

131-
if (!rate_limiter_.Allow(compute_process_key(signal_msg->process_signal()))) {
131+
if (!rate_limiter_.Allow(compute_process_key(*signal_msg))) {
132132
++(stats_->nProcessRateLimitCount);
133133
return IGNORED;
134134
}

collector/lib/SensorClient.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
namespace collector {
66
bool SensorClient::Recreate() {
77
context_ = std::make_unique<grpc::ClientContext>();
8-
writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncCommunicate, channel_, context_.get());
8+
writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncPushProcesses, channel_, context_.get());
99
if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) {
1010
CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ...";
1111
CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message();
@@ -18,14 +18,14 @@ bool SensorClient::Recreate() {
1818
return true;
1919
}
2020

21-
SignalHandler::Result SensorClient::SendMsg(const sensor::MsgFromCollector& msg) {
21+
SignalHandler::Result SensorClient::SendMsg(const sensor::ProcessSignal& msg) {
2222
if (!stream_active_.load(std::memory_order_acquire)) {
2323
CLOG_THROTTLED(ERROR, std::chrono::seconds(10))
2424
<< "GRPC stream is not established";
2525
return SignalHandler::ERROR;
2626
}
2727

28-
if (first_write_ && msg.has_process_signal()) {
28+
if (first_write_) {
2929
first_write_ = false;
3030
return SignalHandler::NEEDS_REFRESH;
3131
}

collector/lib/SensorClient.h

+4-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include <grpcpp/channel.h>
66

77
#include "internalapi/sensor/collector_iservice.grpc.pb.h"
8-
#include "internalapi/sensor/collector_iservice.pb.h"
98

109
#include "DuplexGRPC.h"
1110
#include "SignalHandler.h"
@@ -39,7 +38,7 @@ class ISensorClient {
3938
* @returns A SignalHandler::Result with the outcome of the send
4039
* operation.
4140
*/
42-
virtual SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) = 0;
41+
virtual SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) = 0;
4342
};
4443

4544
class SensorClient : public ISensorClient {
@@ -60,7 +59,7 @@ class SensorClient : public ISensorClient {
6059

6160
bool Recreate() override;
6261

63-
SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) override;
62+
SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override;
6463

6564
private:
6665
std::shared_ptr<grpc::Channel> channel_;
@@ -69,15 +68,15 @@ class SensorClient : public ISensorClient {
6968

7069
// This needs to have the same lifetime as the class.
7170
std::unique_ptr<grpc::ClientContext> context_;
72-
std::unique_ptr<IDuplexClientWriter<sensor::MsgFromCollector>> writer_;
71+
std::unique_ptr<IDuplexClientWriter<sensor::ProcessSignal>> writer_;
7372

7473
bool first_write_ = false;
7574
};
7675

7776
class SensorClientStdout : public ISensorClient {
7877
bool Recreate() override { return true; }
7978

80-
SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) override {
79+
SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) override {
8180
LogProtobufMessage(msg);
8281
return SignalHandler::PROCESSED;
8382
}

collector/lib/SensorClientFormatter.cpp

+7-25
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
#include <google/protobuf/util/time_util.h>
66

7-
#include "internalapi/sensor/collector_iservice.pb.h"
7+
#include "internalapi/sensor/collector.pb.h"
88
#include "internalapi/sensor/signal_iservice.pb.h"
99

1010
#include "CollectorStats.h"
@@ -67,7 +67,7 @@ SensorClientFormatter::SensorClientFormatter(sinsp* inspector, const CollectorCo
6767

6868
SensorClientFormatter::~SensorClientFormatter() = default;
6969

70-
const sensor::MsgFromCollector* SensorClientFormatter::ToProtoMessage(sinsp_evt* event) {
70+
const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_evt* event) {
7171
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
7272
return nullptr;
7373
}
@@ -79,39 +79,21 @@ const sensor::MsgFromCollector* SensorClientFormatter::ToProtoMessage(sinsp_evt*
7979
return nullptr;
8080
}
8181

82-
ProcessSignal* process_signal = CreateProcessSignal(event);
83-
if (process_signal == nullptr) {
84-
return nullptr;
85-
}
86-
87-
auto* msg = AllocateRoot();
88-
msg->clear_info();
89-
msg->clear_register_();
90-
msg->set_allocated_process_signal(process_signal);
91-
return msg;
82+
return CreateProcessSignal(event);
9283
}
9384

94-
const sensor::MsgFromCollector* SensorClientFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
85+
const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
9586
Reset();
9687
if (!ValidateProcessDetails(tinfo)) {
9788
CLOG(INFO) << "Dropping process event: " << tinfo;
9889
return nullptr;
9990
}
10091

101-
ProcessSignal* signal = CreateProcessSignal(tinfo);
102-
if (signal == nullptr) {
103-
return nullptr;
104-
}
105-
106-
auto* msg = AllocateRoot();
107-
msg->clear_register_();
108-
msg->clear_info();
109-
msg->set_allocated_process_signal(signal);
110-
return msg;
92+
return CreateProcessSignal(tinfo);
11193
}
11294

11395
ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
114-
auto signal = Allocate<ProcessSignal>();
96+
auto signal = AllocateRoot();
11597

11698
// set id
11799
signal->set_id(UUIDStr());
@@ -193,7 +175,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
193175
}
194176

195177
ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinfo) {
196-
auto signal = Allocate<ProcessSignal>();
178+
auto signal = AllocateRoot();
197179

198180
// set id
199181
signal->set_id(UUIDStr());

collector/lib/SensorClientFormatter.h

+3-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include <gtest/gtest_prod.h>
66

77
#include "api/v1/signal.pb.h"
8-
#include "internalapi/sensor/collector_iservice.pb.h"
98

109
#include "CollectorConfig.h"
1110
#include "ContainerMetadata.h"
@@ -22,7 +21,7 @@ class EventExtractor;
2221

2322
namespace collector {
2423

25-
class SensorClientFormatter : public ProtoSignalFormatter<sensor::MsgFromCollector> {
24+
class SensorClientFormatter : public ProtoSignalFormatter<sensor::ProcessSignal> {
2625
public:
2726
SensorClientFormatter(const SensorClientFormatter&) = delete;
2827
SensorClientFormatter(SensorClientFormatter&&) = delete;
@@ -35,10 +34,9 @@ class SensorClientFormatter : public ProtoSignalFormatter<sensor::MsgFromCollect
3534
using Signal = v1::Signal;
3635
using ProcessSignal = sensor::ProcessSignal;
3736
using LineageInfo = sensor::ProcessSignal_LineageInfo;
38-
using MsgFromCollector = sensor::MsgFromCollector;
3937

40-
const MsgFromCollector* ToProtoMessage(sinsp_evt* event) override;
41-
const MsgFromCollector* ToProtoMessage(sinsp_threadinfo* tinfo) override;
38+
const ProcessSignal* ToProtoMessage(sinsp_evt* event) override;
39+
const ProcessSignal* ToProtoMessage(sinsp_threadinfo* tinfo) override;
4240

4341
/**
4442
* Get the list of processes that spawned the current one.

collector/proto/third_party/stackrox

Submodule stackrox updated 1659 files

collector/test/CollectorOutputTest.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
#include <gmock/gmock.h>
22
#include <gtest/gtest.h>
33

4-
#include "internalapi/sensor/collector_iservice.pb.h"
5-
64
#include "CollectorOutput.h"
75
#include "SensorClient.h"
86
#include "SignalServiceClient.h"
@@ -11,7 +9,7 @@ namespace collector {
119
class MockSensorClient : public ISensorClient {
1210
public:
1311
MOCK_METHOD(bool, Recreate, ());
14-
MOCK_METHOD(SignalHandler::Result, SendMsg, (const sensor::MsgFromCollector&));
12+
MOCK_METHOD(SignalHandler::Result, SendMsg, (const sensor::ProcessSignal&));
1513
};
1614

1715
class MockSignalClient : public ISignalServiceClient {
@@ -32,7 +30,7 @@ class CollectorOutputTest : public testing::Test {
3230
};
3331

3432
TEST_F(CollectorOutputTest, SensorClient) {
35-
sensor::MsgFromCollector msg;
33+
sensor::ProcessSignal msg;
3634

3735
EXPECT_CALL(*sensor_client, SendMsg).Times(1).WillOnce(testing::Return(SignalHandler::PROCESSED));
3836

integration-tests/go.mod

+14-15
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ require (
1616
github.com/stackrox/rox v0.0.0-20210914215712-9ac265932e28
1717
github.com/stretchr/testify v1.10.0
1818
github.com/thoas/go-funk v0.9.3
19-
go.opentelemetry.io/otel/trace v1.34.0
19+
go.opentelemetry.io/otel/trace v1.35.0
2020
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
21-
golang.org/x/sys v0.31.0
22-
google.golang.org/grpc v1.71.0
21+
golang.org/x/sys v0.32.0
22+
google.golang.org/grpc v1.71.1
2323
gopkg.in/yaml.v3 v3.0.1
2424
k8s.io/api v0.32.3
2525
k8s.io/apimachinery v0.32.3
@@ -46,7 +46,7 @@ require (
4646
github.com/go-logr/stdr v1.2.2 // indirect
4747
github.com/go-openapi/jsonpointer v0.21.0 // indirect
4848
github.com/go-openapi/jsonreference v0.21.0 // indirect
49-
github.com/go-openapi/swag v0.23.0 // indirect
49+
github.com/go-openapi/swag v0.23.1 // indirect
5050
github.com/gogo/protobuf v1.3.2 // indirect
5151
github.com/golang/protobuf v1.5.4 // indirect
5252
github.com/gonum/blas v0.0.0-20181208220705-f22b278b28ac // indirect
@@ -67,7 +67,6 @@ require (
6767
github.com/inconshreveable/mousetrap v1.1.0 // indirect
6868
github.com/josharian/intern v1.0.0 // indirect
6969
github.com/json-iterator/go v1.1.12 // indirect
70-
github.com/klauspost/compress v1.18.0 // indirect
7170
github.com/mailru/easyjson v0.9.0 // indirect
7271
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
7372
github.com/moby/docker-image-spec v1.3.1 // indirect
@@ -81,7 +80,7 @@ require (
8180
github.com/opencontainers/image-spec v1.1.1 // indirect
8281
github.com/planetscale/vtprotobuf v0.6.1-0.20240409071808-615f978279ca // indirect
8382
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
84-
github.com/prometheus/client_golang v1.21.1 // indirect
83+
github.com/prometheus/client_golang v1.22.0 // indirect
8584
github.com/prometheus/client_model v0.6.1 // indirect
8685
github.com/prometheus/procfs v0.15.1 // indirect
8786
github.com/sirupsen/logrus v1.9.3 // indirect
@@ -93,24 +92,24 @@ require (
9392
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
9493
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 // indirect
9594
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect
96-
go.opentelemetry.io/otel v1.34.0 // indirect
95+
go.opentelemetry.io/otel v1.35.0 // indirect
9796
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect
9897
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 // indirect
9998
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 // indirect
100-
go.opentelemetry.io/otel/metric v1.34.0 // indirect
101-
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
99+
go.opentelemetry.io/otel/metric v1.35.0 // indirect
100+
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
102101
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
103102
go.uber.org/atomic v1.11.0 // indirect
104103
go.uber.org/multierr v1.11.0 // indirect
105104
go.uber.org/zap v1.27.0 // indirect
106-
golang.org/x/net v0.37.0 // indirect
107-
golang.org/x/oauth2 v0.28.0 // indirect
108-
golang.org/x/term v0.30.0 // indirect
109-
golang.org/x/text v0.23.0 // indirect
105+
golang.org/x/net v0.39.0 // indirect
106+
golang.org/x/oauth2 v0.29.0 // indirect
107+
golang.org/x/term v0.31.0 // indirect
108+
golang.org/x/text v0.24.0 // indirect
110109
golang.org/x/time v0.11.0 // indirect
111110
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
112111
google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4 // indirect
113-
google.golang.org/protobuf v1.36.5 // indirect
112+
google.golang.org/protobuf v1.36.6 // indirect
114113
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
115114
gopkg.in/inf.v0 v0.9.1 // indirect
116115
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
@@ -129,7 +128,7 @@ replace (
129128
github.com/fullsailor/pkcs7 => github.com/stackrox/pkcs7 v0.0.0-20220914154527-cfdb0aa47179
130129
github.com/heroku/docker-registry-client => github.com/stackrox/docker-registry-client v0.0.0-20230714151239-78b1f5f70b8a
131130
github.com/operator-framework/helm-operator-plugins => github.com/stackrox/helm-operator v0.0.10-0.20220919093109-89f9785764c6
132-
github.com/stackrox/rox => github.com/stackrox/stackrox v0.0.0-20250325114232-8e68c6a26ed1
131+
github.com/stackrox/rox => github.com/stackrox/stackrox v0.0.0-20250415091607-159cf2aea233
133132
go.uber.org/zap => github.com/stackrox/zap v1.18.2-0.20240314134248-5f932edd0404
134133

135134
)

0 commit comments

Comments
 (0)