Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pkg_check_modules(LIB60870 IMPORTED_TARGET lib60870>=2.3.1)
pkg_check_modules(LIBCONFIG IMPORTED_TARGET libconfig>=1.4.9)
pkg_check_modules(MOSQUITTO IMPORTED_TARGET libmosquitto>=1.6.9)
pkg_check_modules(MODBUS IMPORTED_TARGET libmodbus>=3.1.0)
pkg_check_modules(RDKAFKA IMPORTED_TARGET rdkafka>=1.5.0)
pkg_check_modules(RDKAFKAPP IMPORTED_TARGET rdkafka++>=1.5.0)
pkg_check_modules(HIREDIS IMPORTED_TARGET hiredis>=1.0.0)
pkg_check_modules(REDISPP IMPORTED_TARGET redis++>=1.2.0)
pkg_check_modules(RABBITMQ_C IMPORTED_TARGET librabbitmq>=0.8.0)
Expand Down Expand Up @@ -193,7 +193,7 @@ cmake_dependent_option(WITH_NODE_IEC60870 "Build with iec60870 node-types"
cmake_dependent_option(WITH_NODE_IEC61850 "Build with iec61850 node-types" "${WITH_DEFAULTS}" "LIBIEC61850_FOUND; NOT WITHOUT_GPL" OFF)
cmake_dependent_option(WITH_NODE_INFINIBAND "Build with infiniband node-type" "${WITH_DEFAULTS}" "IBVerbs_FOUND; RDMACM_FOUND" OFF) # Infiniband node-type is currenly broken
cmake_dependent_option(WITH_NODE_INFLUXDB "Build with influxdb node-type" "${WITH_DEFAULTS}" "" OFF)
cmake_dependent_option(WITH_NODE_KAFKA "Build with kafka node-type" "${WITH_DEFAULTS}" "RDKAFKA_FOUND" OFF)
cmake_dependent_option(WITH_NODE_KAFKA "Build with kafka node-type" "${WITH_DEFAULTS}" "RDKAFKAPP_FOUND" OFF)
cmake_dependent_option(WITH_NODE_LOOPBACK "Build with loopback node-type" "${WITH_DEFAULTS}" "" OFF)
cmake_dependent_option(WITH_NODE_MODBUS "Build with modbus node-type" "${WITH_DEFAULTS}" "MODBUS_FOUND" OFF)
cmake_dependent_option(WITH_NODE_MQTT "Build with mqtt node-type" "${WITH_DEFAULTS}" "MOSQUITTO_FOUND" OFF)
Expand Down Expand Up @@ -309,7 +309,7 @@ add_feature_info(NODE_MODBUS WITH_NODE_MODBUS "Build with
add_feature_info(NODE_MQTT WITH_NODE_MQTT "Build with mqtt node-type")
add_feature_info(NODE_NANOMSG WITH_NODE_NANOMSG "Build with nanomsg node-type")
add_feature_info(NODE_NGSI WITH_NODE_NGSI "Build with ngsi node-type")
add_feature_info(NODE_OPAL_AYSNC WITH_NODE_OPAL_ASYNC "Build with opal.async node-type")
add_feature_info(NODE_OPAL_ASYNC WITH_NODE_OPAL_ASYNC "Build with opal.async node-type")
add_feature_info(NODE_OPAL_ORCHESTRA WITH_NODE_OPAL_ORCHESTRA "Build with opal.orchestra node-type")
add_feature_info(NODE_OPENDSS WITH_NODE_OPENDSS "Build with opendss node-type")
add_feature_info(NODE_REDIS WITH_NODE_REDIS "Build with redis node-type")
Expand Down
4 changes: 2 additions & 2 deletions doc/openapi/components/schemas/config/nodes/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ allOf:
in:
type: object
properties:
consume:
topic:
type: string
description: The Kafka topic to which this node-type will subscribe for receiving messages.

Expand All @@ -67,7 +67,7 @@ allOf:
out:
type: object
properties:
produce:
topic:
type: string
description: The Kafka topic to which this node-type will publish messages.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ allOf:
description: >-
If true, the DDF file provided in the 'dff' setting will be overwriting with settings and signals from the VILLASnode configuration.

ddf_overwrite_only:
type: boolean
default: false
description: >-
If true, VILLASnode will overwrite the file provided in the 'ddf' setting, and terminate immediately afterwards.

rate:
type: number
default: 1
Expand Down
36 changes: 24 additions & 12 deletions etc/examples/nodes/kafka.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,39 @@ nodes = {

format = "json.kafka"

server = "localhost:9094"
protocol = "SASL_SSL"
server = "localhost:9092"
protocol = "PLAINTEXT"
client_id = "villas-node"

in = {
consume = "test-topic"
topic = "test-topic"
group_id = "villas-node"
}

out = {
produce = "test-topic"
topic = "test-topic"
}
}

ssl = {
ca = "/etc/ssl/certs/ca.pem"
}
siggen = {
type = "signal"

sasl = {
mechanisms = "SCRAM-SHA-512"
username = "scram-sha-512-usr"
password = "scram-sha-512-pwd"
}
rate = 20
values = 5
signal = "mixed"
}
}

paths = (
{
in = "siggen"
out = "kafka_node"
},
{
in = "kafka_node"

hooks = [
"print"
]
}
)
9 changes: 8 additions & 1 deletion include/villas/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#pragma once

#include <functional>
#include <iostream>

#include <fmt/ostream.h>
Expand Down Expand Up @@ -106,6 +107,13 @@ class Node {

virtual json_t *_readStatus() const { return nullptr; }

int parseCommon(
json_t *json,
std::function<Signal::Ptr(json_t *, NodeDirection::Direction)>
parse_signal = [](json_t *j, NodeDirection::Direction d) {
return Signal::fromJson(j);
});

public:
// Initialize node with default values
Node(const uuid_t &id = {}, const std::string &name = "");
Expand Down Expand Up @@ -277,7 +285,6 @@ class Node {
};

class NodeFactory : public villas::plugin::Plugin {

friend Node;

protected:
Expand Down
4 changes: 3 additions & 1 deletion include/villas/node_direction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#pragma once

#include <functional>

#include <jansson.h>

#include <villas/common.hpp>
Expand Down Expand Up @@ -50,7 +52,7 @@ class NodeDirection {

NodeDirection(enum NodeDirection::Direction dir, Node *n);

int parse(json_t *json);
int parse(json_t *json, std::function<Signal::Ptr(json_t *)> parse_signal);
void check();
int prepare();
int start();
Expand Down
6 changes: 4 additions & 2 deletions include/villas/signal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ class Signal {
// Parse signal description.
int parse(json_t *json);

std::string toString(const union SignalData *d = nullptr) const;
virtual std::string toString(const union SignalData *d = nullptr) const;

// Produce JSON representation of signal.
json_t *toJson() const;
virtual json_t *toJson() const;

bool isNext(const Signal &sig);

static Signal::Ptr fromJson(json_t *json);
};

} // namespace node
Expand Down
23 changes: 12 additions & 11 deletions include/villas/signal_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <string_view>

#include <jansson.h>

Expand All @@ -24,16 +27,10 @@ class SignalList : public std::vector<Signal::Ptr> {
using Ptr = std::shared_ptr<SignalList>;

SignalList() {}

SignalList(json_t *json, std::function<Signal::Ptr(json_t *)> parse_signal =
Signal::fromJson);
SignalList(std::string_view dt);
SignalList(unsigned len, enum SignalType fmt);
SignalList(const char *dt);
SignalList(json_t *json) {
int ret = parse(json);
if (ret)
throw RuntimeError("Failed to parse signal list");
}

int parse(json_t *json);

Ptr clone();

Expand All @@ -42,9 +39,13 @@ class SignalList : public std::vector<Signal::Ptr> {

json_t *toJson() const;

int getIndexByName(const std::string &name);
Signal::Ptr getByName(const std::string &name);
int getIndexByName(std::string_view name);
Signal::Ptr getByName(std::string_view name);
Signal::Ptr getByIndex(unsigned idx);

void
parse(json_t *json_signals,
std::function<Signal::Ptr(json_t *)> parse_signal = Signal::fromJson);
};

} // namespace node
Expand Down
8 changes: 3 additions & 5 deletions lib/hooks/lua.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,13 @@ LuaHook::LuaHook(Path *p, Node *n, int fl, int prio, bool en)
LuaHook::~LuaHook() { lua_close(L); }

void LuaHook::parseExpressions(json_t *json_sigs) {
int ret;
size_t i;
json_t *json_sig;

expressions.clear();
signalsExpressions->clear();
ret = signalsExpressions->parse(json_sigs);
if (ret)
throw ConfigError(json_sigs, "node-config-hook-lua-signals",
"Setting 'signals' must be a list of dicts");

signalsExpressions->parse(json_sigs);

// cppcheck-suppress unknownMacro
json_array_foreach(json_sigs, i, json_sig)
Expand Down
39 changes: 25 additions & 14 deletions lib/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ int Node::prepare() {
return 0;
}

int Node::parse(json_t *json) {
int Node::parse(json_t *json) { return parseCommon(json); }

int Node::parseCommon(
json_t *json,
std::function<Signal::Ptr(json_t *, NodeDirection::Direction d)>
parse_signal) {
assert(state == State::INITIALIZED || state == State::PARSED ||
state == State::CHECKED);

Expand Down Expand Up @@ -138,31 +143,37 @@ int Node::parse(json_t *json) {
#endif // WITH_NETEM
}

struct {
struct Direction {
const char *str;
struct NodeDirection *dir;
} dirs[] = {{"in", &in}, {"out", &out}};
struct NodeDirection *obj;
enum NodeDirection::Direction dir;
};
std::vector<Direction> dirs = {{"in", &in, NodeDirection::Direction::IN},
{"out", &out, NodeDirection::Direction::OUT}};

const char *fields[] = {"signals", "builtin", "vectorize", "hooks"};
std::vector<std::string> fields = {"signals", "builtin", "vectorize",
"hooks"};

for (unsigned j = 0; j < ARRAY_LEN(dirs); j++) {
json_t *json_dir = json_object_get(json, dirs[j].str);
for (auto &dir : dirs) {
json_t *json_dir = json_object_get(json, dir.str);

// Skip if direction is unused
if (!json_dir) {
json_dir = json_pack("{ s: b }", "enabled", 0);
}

// Copy missing fields from main node config to direction config
for (unsigned i = 0; i < ARRAY_LEN(fields); i++) {
json_t *json_field_dir = json_object_get(json_dir, fields[i]);
json_t *json_field_node = json_object_get(json, fields[i]);
// Copy missing fields from main node config to direction config.
for (auto &field : fields) {
json_t *json_field_dir = json_object_get(json_dir, field.c_str());
json_t *json_field_node = json_object_get(json, field.c_str());

if (json_field_node && !json_field_dir)
json_object_set(json_dir, fields[i], json_field_node);
if (json_field_node && !json_field_dir) {
json_object_set(json_dir, field.c_str(), json_field_node);
}
}

ret = dirs[j].dir->parse(json_dir);
ret = dir.obj->parse(json_dir,
[&](json_t *j) { return parse_signal(j, dir.dir); });
if (ret)
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/node_compat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ int NodeCompat::prepare() {
}

int NodeCompat::parse(json_t *json) {
int ret = Node::parse(json);
int ret = Node::parseCommon(json);
if (ret)
return ret;

Expand Down
50 changes: 4 additions & 46 deletions lib/node_direction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ NodeDirection::NodeDirection(enum NodeDirection::Direction dir, Node *n)
: direction(dir), path(nullptr), node(n), enabled(1), builtin(1),
vectorize(1), config(nullptr) {}

int NodeDirection::parse(json_t *json) {
int NodeDirection::parse(json_t *json,
std::function<Signal::Ptr(json_t *)> parse_signal) {
int ret;

json_error_t err;
Expand All @@ -44,53 +45,10 @@ int NodeDirection::parse(json_t *json) {
signals = std::make_shared<SignalList>();
if (!signals)
throw MemoryAllocationError();
} else if (json_is_object(json_signals) || json_is_array(json_signals)) {
signals = std::make_shared<SignalList>();
} else if (json_signals) {
signals = std::make_shared<SignalList>(json_signals, parse_signal);
if (!signals)
throw MemoryAllocationError();

if (json_is_object(json_signals)) {
json_t *json_name, *json_signal = json_signals;
int count;

ret = json_unpack_ex(json_signal, &err, 0, "{ s: i }", "count", &count);
if (ret)
throw ConfigError(json_signals, "node-config-node-signals",
"Invalid signal definition");

json_signals = json_array();
for (int i = 0; i < count; i++) {
json_t *json_signal_copy = json_copy(json_signal);

json_object_del(json_signal, "count");

// Append signal index
json_name = json_object_get(json_signal_copy, "name");
if (json_name) {
const char *name = json_string_value(json_name);
char *name_new;

int ret __attribute__((unused));
ret = asprintf(&name_new, "%s%d", name, i);

json_string_set(json_name, name_new);
}

json_array_append_new(json_signals, json_signal_copy);
}
json_object_set_new(json, "signals", json_signals);
}

ret = signals->parse(json_signals);
if (ret)
throw ConfigError(json_signals, "node-config-node-signals",
"Failed to parse signal definition");
} else if (json_is_string(json_signals)) {
const char *dt = json_string_value(json_signals);

signals = std::make_shared<SignalList>(dt);
if (!signals)
return -1;
} else {
signals =
std::make_shared<SignalList>(DEFAULT_SAMPLE_LENGTH, SignalType::FLOAT);
Expand Down
2 changes: 1 addition & 1 deletion lib/nodes/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ endif()
# Enable Kafka support
if(WITH_NODE_KAFKA)
list(APPEND NODE_SRC kafka.cpp)
list(APPEND LIBRARIES PkgConfig::RDKAFKA)
list(APPEND LIBRARIES PkgConfig::RDKAFKAPP)
endif()

# Enable Comedi support
Expand Down
2 changes: 1 addition & 1 deletion lib/nodes/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ int APINode::_write(struct Sample *smps[], unsigned cnt) {
}

int APINode::parse(json_t *json) {
int ret = Node::parse(json);
int ret = Node::parseCommon(json);
if (ret)
return ret;

Expand Down
Loading