Skip to content

Use the new collector internal service for process information #2063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions collector/lib/CollectorConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ PathEnvVar tls_client_cert_path("ROX_COLLECTOR_TLS_CLIENT_CERT");
PathEnvVar tls_client_key_path("ROX_COLLECTOR_TLS_CLIENT_KEY");

BoolEnvVar disable_process_arguments("ROX_COLLECTOR_NO_PROCESS_ARGUMENTS", false);

BoolEnvVar use_stdout_output("ROX_COLLECTOR_USE_STDOUT", false);

BoolEnvVar use_legacy_services("ROX_COLLECTOR_USE_LEGACY_SERVICES", false);
} // namespace

constexpr bool CollectorConfig::kTurnOffScrape;
Expand Down Expand Up @@ -113,6 +117,8 @@ void CollectorConfig::InitCollectorConfig(CollectorArgs* args) {
enable_introspection_ = enable_introspection.value();
track_send_recv_ = track_send_recv.value();
disable_process_arguments_ = disable_process_arguments.value();
use_stdout_ = use_stdout_output.value();
use_legacy_services_ = use_legacy_services.value();

for (const auto& syscall : kSyscalls) {
syscalls_.emplace_back(syscall);
Expand Down
9 changes: 9 additions & 0 deletions collector/lib/CollectorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <shared_mutex>
#include <vector>

#include <gtest/gtest_prod.h>
#include <json/json.h>
#include <yaml-cpp/yaml.h>

Expand Down Expand Up @@ -165,6 +166,8 @@ class CollectorConfig {
unsigned int GetSinspTotalBufferSize() const { return sinsp_total_buffer_size_; }
unsigned int GetSinspThreadCacheSize() const { return sinsp_thread_cache_size_; }
bool DisableProcessArguments() const { return disable_process_arguments_; }
bool UseStdout() const { return use_stdout_; }
bool UseLegacyServices() const { return use_legacy_services_; }

static std::pair<option::ArgStatus, std::string> CheckConfiguration(const char* config, Json::Value* root);

Expand Down Expand Up @@ -194,6 +197,8 @@ class CollectorConfig {
}

protected:
FRIEND_TEST(SensorClientFormatterTest, NoProcessArguments);

int scrape_interval_;
CollectionMethod collection_method_;
bool turn_off_scrape_;
Expand Down Expand Up @@ -230,6 +235,10 @@ class CollectorConfig {

bool disable_process_arguments_ = false;

bool use_stdout_ = false;

bool use_legacy_services_ = false;

// One ring buffer will be initialized for this many CPUs
unsigned int sinsp_cpu_per_buffer_ = 0;
// Size of one ring buffer, in bytes.
Expand Down
140 changes: 140 additions & 0 deletions collector/lib/CollectorOutput.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include "CollectorOutput.h"

#include "internalapi/sensor/collector_iservice.pb.h"

#include "GRPCUtil.h"
#include "HostInfo.h"

namespace collector {

CollectorOutput::CollectorOutput(const CollectorConfig& config)
: use_sensor_client_(!config.UseLegacyServices()) {
if (config.grpc_channel != nullptr) {
channel_ = config.grpc_channel;

if (use_sensor_client_) {
auto sensor_client = std::make_unique<SensorClient>(channel_);
sensor_clients_.emplace_back(std::move(sensor_client));
} else {
auto signal_client = std::make_unique<SignalServiceClient>(channel_);
signal_clients_.emplace_back(std::move(signal_client));
}
}

if (config.grpc_channel == nullptr || config.UseStdout()) {
if (use_sensor_client_) {
auto sensor_client = std::make_unique<SensorClientStdout>();
sensor_clients_.emplace_back(std::move(sensor_client));
} else {
auto signal_client = std::make_unique<StdoutSignalServiceClient>();
signal_clients_.emplace_back(std::move(signal_client));
}
}

thread_.Start([this] { EstablishGrpcStream(); });
}

void CollectorOutput::HandleOutputError() {
CLOG(ERROR) << "GRPC stream interrupted";
stream_active_.store(false, std::memory_order_release);
stream_interrupted_.notify_one();
}

SignalHandler::Result CollectorOutput::SensorOutput(const sensor::MsgFromCollector& msg) {
for (auto& client : sensor_clients_) {
auto res = client->SendMsg(msg);
switch (res) {
case SignalHandler::PROCESSED:
break;

case SignalHandler::ERROR:
HandleOutputError();
return res;

case SignalHandler::IGNORED:
case SignalHandler::NEEDS_REFRESH:
case SignalHandler::FINISHED:
return res;
}
}
return SignalHandler::PROCESSED;
}

SignalHandler::Result CollectorOutput::SignalOutput(const sensor::SignalStreamMessage& msg) {
for (auto& client : signal_clients_) {
auto res = client->PushSignals(msg);
switch (res) {
case SignalHandler::PROCESSED:
break;

case SignalHandler::ERROR:
HandleOutputError();
return res;

case SignalHandler::IGNORED:
case SignalHandler::NEEDS_REFRESH:
case SignalHandler::FINISHED:
return res;
}
}
return SignalHandler::PROCESSED;
}

SignalHandler::Result CollectorOutput::SendMsg(const MessageType& msg) {
auto visitor = [this](auto&& m) {
using T = std::decay_t<decltype(m)>;
if constexpr (std::is_same_v<T, sensor::MsgFromCollector>) {
return SensorOutput(m);
} else if constexpr (std::is_same_v<T, sensor::SignalStreamMessage>) {
return SignalOutput(m);
}

// Unknown type
return SignalHandler::ERROR;
};

return std::visit(visitor, msg);
}

void CollectorOutput::EstablishGrpcStream() {
while (EstablishGrpcStreamSingle()) {
}
CLOG(INFO) << "Service client terminating.";
}

bool CollectorOutput::EstablishGrpcStreamSingle() {
std::mutex mtx;
std::unique_lock<std::mutex> lock(mtx);
stream_interrupted_.wait(lock, [this]() { return !stream_active_.load(std::memory_order_acquire) || thread_.should_stop(); });
if (thread_.should_stop()) {
return false;
}

CLOG(INFO) << "Trying to establish GRPC stream...";

if (!WaitForChannelReady(channel_, [this]() { return thread_.should_stop(); })) {
return false;
}
if (thread_.should_stop()) {
return false;
}

// Refresh all clients
bool success = true;
for (const auto& client : signal_clients_) {
success &= client->Recreate();
}

for (const auto& client : sensor_clients_) {
success &= client->Recreate();
}

if (success) {
CLOG(INFO) << "Successfully established GRPC stream.";
stream_active_.store(true, std::memory_order_release);
} else {
CLOG(WARNING) << "Failed to establish GRPC stream, retrying...";
}
return true;
}
} // namespace collector
79 changes: 79 additions & 0 deletions collector/lib/CollectorOutput.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#pragma once

#include <variant>

#include "internalapi/sensor/collector_iservice.pb.h"
#include "internalapi/sensor/signal_iservice.pb.h"

#include "CollectorConfig.h"
#include "SensorClient.h"
#include "SignalHandler.h"
#include "SignalServiceClient.h"
#include "StoppableThread.h"

namespace collector {

using MessageType = std::variant<sensor::MsgFromCollector, sensor::SignalStreamMessage>;

class CollectorOutput {
public:
CollectorOutput(const CollectorOutput&) = delete;
CollectorOutput(CollectorOutput&&) = delete;
CollectorOutput& operator=(const CollectorOutput&) = delete;
CollectorOutput& operator=(CollectorOutput&&) = delete;

CollectorOutput(const CollectorConfig& config);

~CollectorOutput() {
stream_interrupted_.notify_one();
if (thread_.running()) {
thread_.Stop();
}
}

// Constructor for tests
CollectorOutput(std::unique_ptr<ISensorClient>&& sensor_client,
std::unique_ptr<ISignalServiceClient>&& signal_client) {
sensor_clients_.emplace_back(std::move(sensor_client));
signal_clients_.emplace_back(std::move(signal_client));
}

/**
* Send a message to sensor.
*
* @param msg One of sensor::MsgFromCollector or
* sensor::SignalStreamMessage, the proper service to be
* used will be determined from the type held in msg.
* @returns A SignalHandler::Result with the outcome of the send
* operation
*/
SignalHandler::Result SendMsg(const MessageType& msg);

/**
* Whether we should use the new iservice or not.
*
* @returns true if configuration indicates we should use the new
* iservice, false otherwise.
*/
bool UseSensorClient() const { return use_sensor_client_; }

private:
void EstablishGrpcStream();
bool EstablishGrpcStreamSingle();

void HandleOutputError();
SignalHandler::Result SensorOutput(const sensor::MsgFromCollector& msg);
SignalHandler::Result SignalOutput(const sensor::SignalStreamMessage& msg);

std::vector<std::unique_ptr<ISensorClient>> sensor_clients_;
std::vector<std::unique_ptr<ISignalServiceClient>> signal_clients_;

bool use_sensor_client_ = true;

StoppableThread thread_;
std::atomic<bool> stream_active_ = false;
std::condition_variable stream_interrupted_;
std::shared_ptr<grpc::Channel> channel_;
};

} // namespace collector
3 changes: 2 additions & 1 deletion collector/lib/CollectorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ static const std::string PROMETHEUS_PORT = "9090";
CollectorService::CollectorService(CollectorConfig& config, std::atomic<ControlValue>* control,
const std::atomic<int>* signum)
: config_(config),
system_inspector_(config_),
output_(config),
system_inspector_(config, &output_),
control_(control),
signum_(*signum),
server_(OPTIONS),
Expand Down
2 changes: 2 additions & 0 deletions collector/lib/CollectorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "CivetWrapper.h"
#include "CollectorConfig.h"
#include "CollectorOutput.h"
#include "CollectorStatsExporter.h"
#include "ConfigLoader.h"
#include "Control.h"
Expand Down Expand Up @@ -32,6 +33,7 @@ class CollectorService {
bool WaitForGRPCServer();

CollectorConfig& config_;
CollectorOutput output_;
system_inspector::Service system_inspector_;

std::atomic<ControlValue>* control_;
Expand Down
3 changes: 1 addition & 2 deletions collector/lib/CollectorStatsExporter.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#include "CollectorStatsExporter.h"

#include <chrono>
#include <iostream>
#include <math.h>

#include "CollectorStats.h"
#include "Containers.h"
#include "EventNames.h"
#include "Logging.h"
#include "Utility.h"
#include "prometheus/gauge.h"
#include "system-inspector/Service.h"

Expand Down
2 changes: 0 additions & 2 deletions collector/lib/CollectorStatsExporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include <memory>

#include "CollectorConfig.h"
#include "CollectorConnectionStats.h"
#include "CollectorStats.h"
#include "StoppableThread.h"
#include "prometheus/registry.h"
#include "system-inspector/Service.h"
Expand Down
20 changes: 10 additions & 10 deletions collector/lib/ProcessSignalFormatter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ProcessSignalFormatter::ProcessSignalFormatter(
event_extractor_->Init(inspector);
}

ProcessSignalFormatter::~ProcessSignalFormatter() {}
ProcessSignalFormatter::~ProcessSignalFormatter() = default;

const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* event) {
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
Expand All @@ -80,11 +80,11 @@ const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_evt* eve
}

ProcessSignal* process_signal = CreateProcessSignal(event);
if (!process_signal) {
if (process_signal == nullptr) {
return nullptr;
}

Signal* signal = Allocate<Signal>();
auto* signal = Allocate<Signal>();
signal->set_allocated_process_signal(process_signal);

SignalStreamMessage* signal_stream_message = AllocateRoot();
Expand All @@ -101,11 +101,11 @@ const SignalStreamMessage* ProcessSignalFormatter::ToProtoMessage(sinsp_threadin
}

ProcessSignal* process_signal = CreateProcessSignal(tinfo);
if (!process_signal) {
if (process_signal == nullptr) {
return nullptr;
}

Signal* signal = Allocate<Signal>();
auto* signal = Allocate<Signal>();
signal->set_allocated_process_signal(process_signal);

SignalStreamMessage* signal_stream_message = AllocateRoot();
Expand Down Expand Up @@ -315,20 +315,20 @@ void ProcessSignalFormatter::CountLineage(const std::vector<LineageInfo>& lineag

void ProcessSignalFormatter::GetProcessLineage(sinsp_threadinfo* tinfo,
std::vector<LineageInfo>& lineage) {
if (tinfo == NULL) {
if (tinfo == nullptr) {
return;
}
sinsp_threadinfo* mt = NULL;
sinsp_threadinfo* mt = nullptr;
if (tinfo->is_main_thread()) {
mt = tinfo;
} else {
mt = tinfo->get_main_thread();
if (mt == NULL) {
if (mt == nullptr) {
return;
}
}
sinsp_threadinfo::visitor_func_t visitor = [this, &lineage](sinsp_threadinfo* pt) {
if (pt == NULL) {
sinsp_threadinfo::visitor_func_t visitor = [&lineage](sinsp_threadinfo* pt) {
if (pt == nullptr) {
return false;
}
if (pt->m_pid == 0) {
Expand Down
Loading
Loading