From fa0e5acff7539016828177f0c630e7e104c6e16b Mon Sep 17 00:00:00 2001 From: Jonas Karlsson Date: Thu, 7 Jun 2018 14:47:57 +0200 Subject: [PATCH 1/8] Inital support for zeromq relay functionality --- metadata_exporter.c | 16 ++++ metadata_exporter.h | 9 ++ metadata_input_zeromq_relay.c | 174 ++++++++++++++++++++++++++++++++++ metadata_input_zeromq_relay.h | 44 +++++++++ metadata_writer_zeromq.c | 15 +++ 5 files changed, 258 insertions(+) create mode 100644 metadata_input_zeromq_relay.c create mode 100644 metadata_input_zeromq_relay.h diff --git a/metadata_exporter.c b/metadata_exporter.c index 0070b2d..87d1ef8 100644 --- a/metadata_exporter.c +++ b/metadata_exporter.c @@ -47,6 +47,9 @@ #ifdef ZEROMQ_SUPPORT_INPUT #include "metadata_input_zeromq.h" #endif +#ifdef ZEROMQ_SUPPORT_INPUT_RELAY + #include "metadata_input_zeromq_relay.h" +#endif #ifdef MUNIN_SUPPORT #include "metadata_input_munin.h" #endif @@ -799,6 +802,19 @@ int main(int argc, char *argv[]) num_inputs++; } #endif +#ifdef ZEROMQ_SUPPORT_INPUT_RELAY + else if (!strcmp(key, "zmq_input_relay")) { + mde->md_inputs[MD_INPUT_ZEROMQ_RELAY] = calloc(sizeof(struct md_input_zeromq), 1); + + if (mde->md_inputs[MD_INPUT_ZEROMQ_RELAY] == NULL) { + META_PRINT_SYSLOG(mde, LOG_ERR, "Could not allocate ZeroMQ Relay input\n"); + exit(EXIT_FAILURE); + } + + md_zeromq_input_setup(mde, (struct md_input_zeromq*) mde->md_inputs[MD_INPUT_ZEROMQ_RELAY]); + num_inputs++; + } +#endif #ifdef NNE_SUPPORT else if (!strcmp(key, "nne")) { mde->md_writers[MD_WRITER_NNE] = calloc(sizeof(struct md_writer_nne), 1); diff --git a/metadata_exporter.h b/metadata_exporter.h index 46066b4..027c2a0 100644 --- a/metadata_exporter.h +++ b/metadata_exporter.h @@ -54,6 +54,7 @@ #define META_TYPE_SYSEVENT 0x06 #define META_TYPE_RADIO 0x08 #define META_TYPE_SYSTEM 0x10 +#define META_TYPE_ZEROMQ 0x11 enum iface_event { IFACE_EVENT_DEV_STATE=1, @@ -112,6 +113,7 @@ enum md_inputs { MD_INPUT_MUNIN, MD_INPUT_SYSEVENT, MD_INPUT_ZEROMQ, + MD_INPUT_ZEROMQ_RELAY, __MD_INPUT_MAX }; @@ -238,6 +240,13 @@ struct md_munin_event { json_object* json_blob; }; +struct md_zeromq_event { + MD_EVENT; + const char *msg; + //const char *topic; + //json_object* json_blob; +}; + struct md_radio_event { MD_RADIO_EVENT; }; diff --git a/metadata_input_zeromq_relay.c b/metadata_input_zeromq_relay.c new file mode 100644 index 0000000..b875566 --- /dev/null +++ b/metadata_input_zeromq_relay.c @@ -0,0 +1,174 @@ +/* Copyright (c) 2015, Celerway, Kristian Evensen + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include JSON_LOC +#include +#include +#include + +#include "metadata_exporter.h" +#include "metadata_input_nl_zmq_common.h" +#include "metadata_input_zeromq_relay.h" +#include "backend_event_loop.h" + +#include "lib/minmea.h" +#include "metadata_exporter_log.h" + +static void md_input_zeromq_relay_handle_event(void *ptr, int32_t fd, uint32_t events) +{ + struct md_input_zeromq_relay *miz = ptr; + int zmq_events = 0; + size_t events_len = sizeof(zmq_events); + json_object *zmqh_obj = NULL; + const char *json_msg; + + zmq_getsockopt(miz->zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + + while (zmq_events & ZMQ_POLLIN) + { + char buf[2048] = {0}; + zmq_recv(miz->zmq_socket, buf, 2048, 0); + + json_msg = strchr(buf, '{'); + // Sanity checks + // Do we even have a json object + if (json_msg == NULL) + { + zmq_getsockopt(miz->zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + continue; + } + + // Is the json object valid ? + zmqh_obj = json_tokener_parse(json_msg); + if (!zmqh_obj) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Received invalid JSON object on ZMQ socket\n"); + zmq_getsockopt(miz->zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + continue; + } + //TODO: Check so we also have a topic + + META_PRINT(miz->parent->logfile, "Got JSON %s\n", json_object_to_json_string(zmqh_obj)); + json_object_put(zmqh_obj); + + //Yay we have a valid object lets store that and publish it. + + //Create a zeromq event + memset(miz->mse, 0, sizeof(struct md_zeromq_event)); + miz->mse->md_type = META_TYPE_ZEROMQ; + miz->mse->msg = buf; + mde_publish_event_obj(miz->parent, (struct md_event*) miz->mse); + + zmq_getsockopt(miz->zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + } +} + +static uint8_t md_input_zeromq_relay_config(struct md_input_zeromq_relay *miz) +{ + int zmq_fd = -1; + size_t len = 0; + + // Connect to ZMQ publisher(s) + // first version connects to one publisher, next version should accept multiple publishers + miz->zmq_ctx = zmq_ctx_new(); + if (miz->zmq_ctx == NULL) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't create ZMQ context\n"); + return RETVAL_FAILURE; + } + + miz->zmq_socket = zmq_socket(miz->zmq_ctx, ZMQ_SUB); + if (miz->zmq_socket == NULL) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't create ZMQ socket\n"); + return RETVAL_FAILURE; + } + + //Connect to user defined publiser $URL + if (zmq_connect(miz->zmq_socket, miz->zmq_pub_url) == -1) + { + char buf[1024] = {0}; + snprintf(buf. sizeof(buf), "Can't connect to %s ZMQ publisher\n",miz->zmq_pub_url ) + META_PRINT_SYSLOG(miz->parent, LOG_ERR, buf); + return RETVAL_FAILURE; + } + + // subscribe to all topics (of this publisher) + const char *topic = ""; + zmq_setsockopt(miz->zmq_socket, ZMQ_SUBSCRIBE, topic, strlen(topic)); + + len = sizeof(zmq_fd); + if (zmq_getsockopt(miz->zmq_socket, ZMQ_FD, &zmq_fd, &len) == -1) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't get ZMQ file descriptor\n"); + return RETVAL_FAILURE; + } + + if(!(miz->event_handle = backend_create_epoll_handle(miz, + zmq_fd, md_input_zeromq_relay_handle_event))) + return RETVAL_FAILURE; + + backend_event_loop_update(miz->parent->event_loop, EPOLLIN, EPOLL_CTL_ADD, + zmq_fd, miz->event_handle); + + return RETVAL_SUCCESS; +} + +static uint8_t md_input_zeromq_relay_init(void *ptr, json_object* config) +{ + struct md_input_zeromq_relay *miz = ptr; + char url[256] = {0}; + + json_object* subconfig; + if (json_object_object_get_ex(config, "zmq_input_relay", &subconfig)) { + json_object_object_foreach(subconfig, key, val) { + if (!strcmp(key, "url")) + miz->zmq_pub_url = strncpy(url,val, sizeof(url)); + } + } + + + if (!miz->zmq_pub_url) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "At least one publisher must be present\n"); + return RETVAL_FAILURE; + } + + return md_input_zeromq_relay_config(miz); +} + +void md_zeromq_relay_usage() +{ + fprintf(stderr, "\"zmq_input\": {\t\tZeroMQ input (at least one url must be present)\n"); + fprintf(stderr, " \"url\":\t\tListen to ZeroMQ events on this URL\n"); + fprintf(stderr, "},\n"); +} + +void md_zeromq_relay_setup(struct md_exporter *mde, struct md_input_zeromq_relay *miz) +{ + miz->parent = mde; + miz->init = md_input_zeromq_relay_init; +} diff --git a/metadata_input_zeromq_relay.h b/metadata_input_zeromq_relay.h new file mode 100644 index 0000000..33fab3d --- /dev/null +++ b/metadata_input_zeromq_relay.h @@ -0,0 +1,44 @@ +/* Copyright (c) 2018, Karlstad Universitet, Jonas Karlsson + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#pragma once +#include "metadata_exporter.h" + +struct backend_epoll_handle; + +struct md_input_zeromq_relay { + MD_INPUT; + struct backend_epoll_handle *event_handle; + uint32_t md_zmq_mask; + const char *zmq_pub_url; + void* zmq_ctx; + void* zmq_socket; + int zmq_fd; + struct md_zeromq_event *mse; +}; + +void md_zeromq_relay_usage(); +void md_zeromq_relay_setup(struct md_exporter *mde, struct md_input_zeromq_relay *miz); diff --git a/metadata_writer_zeromq.c b/metadata_writer_zeromq.c index 33bdd03..61ac052 100644 --- a/metadata_writer_zeromq.c +++ b/metadata_writer_zeromq.c @@ -219,6 +219,18 @@ static void md_zeromq_writer_handle_munin(struct md_writer_zeromq *mwz, } } +static void md_zeromq_writer_handle_zeromq(struct md_writer_zeromq *mwz, + struct md_zeromq_event *mge) +{ + char topic[8192]; //In reality this is the message not the topic + int retval; + + retval = snprintf(topic, sizeof(topic), "%s", mge->msg); + if (retval < sizeof(topic)) { + zmq_send(mwz->zmq_publisher, topic, strlen(topic), 0); + } +} + static void md_zeromq_writer_handle_sysevent(struct md_writer_zeromq *mwz, struct md_sysevent *mge) @@ -1024,6 +1036,9 @@ static void md_zeromq_writer_handle(struct md_writer *writer, struct md_event *e case META_TYPE_RADIO: md_zeromq_writer_handle_radio(mwz, (struct md_radio_event*) event); break; + case META_TYPE_ZEROMQ: + md_zeromq_writer_handle_zeromq(mwz, (struct md_zeromq_event*) event); + break; default: META_PRINT_SYSLOG(mwz->parent, LOG_INFO, "ZMQ writer does not support event %u\n", event->md_type); From 8a6e4ec6d9c978a0a60c7b51d1c8313172ae7138 Mon Sep 17 00:00:00 2001 From: Jonas Karlsson Date: Thu, 7 Jun 2018 14:52:26 +0200 Subject: [PATCH 2/8] Removed unused variable --- metadata_input_zeromq_relay.h | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata_input_zeromq_relay.h b/metadata_input_zeromq_relay.h index 33fab3d..5a99b2d 100644 --- a/metadata_input_zeromq_relay.h +++ b/metadata_input_zeromq_relay.h @@ -32,7 +32,6 @@ struct backend_epoll_handle; struct md_input_zeromq_relay { MD_INPUT; struct backend_epoll_handle *event_handle; - uint32_t md_zmq_mask; const char *zmq_pub_url; void* zmq_ctx; void* zmq_socket; From ca31a25f8c4da4abf171fc68d8462b448bc296ab Mon Sep 17 00:00:00 2001 From: Jonas Karlsson Date: Thu, 7 Jun 2018 14:56:41 +0200 Subject: [PATCH 3/8] Fixed license clause --- metadata_input_zeromq_relay.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata_input_zeromq_relay.c b/metadata_input_zeromq_relay.c index b875566..8e4bba0 100644 --- a/metadata_input_zeromq_relay.c +++ b/metadata_input_zeromq_relay.c @@ -1,4 +1,4 @@ -/* Copyright (c) 2015, Celerway, Kristian Evensen +/* Copyright (c) 2018, Karlstad Universitet, Jonas Karlsson * All rights reserved. * * Redistribution and use in source and binary forms, with or without From 8b857728b6215816480c6f6ab27baad14e67bc02 Mon Sep 17 00:00:00 2001 From: Jonas Karlsson Date: Fri, 8 Jun 2018 08:32:33 +0200 Subject: [PATCH 4/8] Fixed compilations bugs --- CMakeLists.txt | 7 +++++++ metadata_exporter.c | 4 ++-- metadata_input_zeromq_relay.c | 6 ++---- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a1bc893..d1814f4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -74,6 +74,13 @@ if (ZEROMQ_INPUT) add_definitions("-DZEROMQ_SUPPORT_INPUT") endif() +if (ZEROMQ_RELAY) + set(LIBS ${LIBS} zmq) + set(SOURCE ${SOURCE} + metadata_input_zeromq_relay.c) + add_definitions("-DZEROMQ_SUPPORT_INPUT_RELAY") +endif() + if (GPSD) set(LIBS ${LIBS} gps) set(SOURCE ${SOURCE} diff --git a/metadata_exporter.c b/metadata_exporter.c index 87d1ef8..f5ece3f 100644 --- a/metadata_exporter.c +++ b/metadata_exporter.c @@ -804,14 +804,14 @@ int main(int argc, char *argv[]) #endif #ifdef ZEROMQ_SUPPORT_INPUT_RELAY else if (!strcmp(key, "zmq_input_relay")) { - mde->md_inputs[MD_INPUT_ZEROMQ_RELAY] = calloc(sizeof(struct md_input_zeromq), 1); + mde->md_inputs[MD_INPUT_ZEROMQ_RELAY] = calloc(sizeof(struct md_input_zeromq_relay), 1); if (mde->md_inputs[MD_INPUT_ZEROMQ_RELAY] == NULL) { META_PRINT_SYSLOG(mde, LOG_ERR, "Could not allocate ZeroMQ Relay input\n"); exit(EXIT_FAILURE); } - md_zeromq_input_setup(mde, (struct md_input_zeromq*) mde->md_inputs[MD_INPUT_ZEROMQ_RELAY]); + md_zeromq_relay_setup(mde, (struct md_input_zeromq_relay*) mde->md_inputs[MD_INPUT_ZEROMQ_RELAY]); num_inputs++; } #endif diff --git a/metadata_input_zeromq_relay.c b/metadata_input_zeromq_relay.c index 8e4bba0..28f4936 100644 --- a/metadata_input_zeromq_relay.c +++ b/metadata_input_zeromq_relay.c @@ -112,9 +112,7 @@ static uint8_t md_input_zeromq_relay_config(struct md_input_zeromq_relay *miz) //Connect to user defined publiser $URL if (zmq_connect(miz->zmq_socket, miz->zmq_pub_url) == -1) { - char buf[1024] = {0}; - snprintf(buf. sizeof(buf), "Can't connect to %s ZMQ publisher\n",miz->zmq_pub_url ) - META_PRINT_SYSLOG(miz->parent, LOG_ERR, buf); + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't connect to %s ZMQ publisher\n",miz->zmq_pub_url ); return RETVAL_FAILURE; } @@ -147,7 +145,7 @@ static uint8_t md_input_zeromq_relay_init(void *ptr, json_object* config) if (json_object_object_get_ex(config, "zmq_input_relay", &subconfig)) { json_object_object_foreach(subconfig, key, val) { if (!strcmp(key, "url")) - miz->zmq_pub_url = strncpy(url,val, sizeof(url)); + miz->zmq_pub_url = strncpy(url,json_object_get_string(val), sizeof(url)); } } From 3cb59b39679746610cce58289a016b5e8410093d Mon Sep 17 00:00:00 2001 From: Jonas Karlsson Date: Tue, 12 Jun 2018 10:28:47 +0200 Subject: [PATCH 5/8] Multiple ZeroMQ publisher support --- metadata_exporter.c | 3 + metadata_exporter.h | 4 +- metadata_input_zeromq_relay.c | 166 +++++++++++++++++++++++++--------- metadata_input_zeromq_relay.h | 25 ++++- 4 files changed, 148 insertions(+), 50 deletions(-) diff --git a/metadata_exporter.c b/metadata_exporter.c index f5ece3f..c4a4737 100644 --- a/metadata_exporter.c +++ b/metadata_exporter.c @@ -673,6 +673,9 @@ static void print_usage() #ifdef MUNIN_SUPPORT md_munin_usage(); #endif +#ifdef ZEROMQ_SUPPORT_INPUT_RELAY + md_zeromq_relay_usage(); +#endif #ifdef GPS_NSB_SUPPORT md_gps_nsb_usage(); #endif diff --git a/metadata_exporter.h b/metadata_exporter.h index 027c2a0..f0e91e0 100644 --- a/metadata_exporter.h +++ b/metadata_exporter.h @@ -54,7 +54,7 @@ #define META_TYPE_SYSEVENT 0x06 #define META_TYPE_RADIO 0x08 #define META_TYPE_SYSTEM 0x10 -#define META_TYPE_ZEROMQ 0x11 +#define META_TYPE_ZEROMQ 0xA enum iface_event { IFACE_EVENT_DEV_STATE=1, @@ -243,8 +243,6 @@ struct md_munin_event { struct md_zeromq_event { MD_EVENT; const char *msg; - //const char *topic; - //json_object* json_blob; }; struct md_radio_event { diff --git a/metadata_input_zeromq_relay.c b/metadata_input_zeromq_relay.c index 28f4936..1eb3158 100644 --- a/metadata_input_zeromq_relay.c +++ b/metadata_input_zeromq_relay.c @@ -42,6 +42,53 @@ #include "lib/minmea.h" #include "metadata_exporter_log.h" +static int hashCode(struct table *t,int key){ + if(key<0) + return -(key%t->size); + return key%t->size; +} + +void insert(struct table *t,int key, struct zmq_connection *val){ + int pos = hashCode(t,key); + struct node *list = t->list[pos]; + struct node *newNode = (struct node*)malloc(sizeof(struct node)); + struct node *temp = list; + while(temp){ + if(temp->key==key){ + temp->val = val; + return; + } + temp = temp->next; + } + newNode->key = key; + newNode->val = val; + newNode->next = list; + t->list[pos] = newNode; +} + +struct table *createTable(int size){ + struct table *t = (struct table*)malloc(sizeof(struct table)); + t->size = size; + t->list = (struct node**)malloc(sizeof(struct node*)*size); + int i; + for(i=0;ilist[i] = NULL; + return t; +}; + +static struct zmq_connection* lookup(struct table *t,int key){ + int pos = hashCode(t,key); + struct node *list = t->list[pos]; + struct node *temp = list; + while(temp){ + if(temp->key==key){ + return temp->val; + } + temp = temp->next; + } + return NULL; +} + static void md_input_zeromq_relay_handle_event(void *ptr, int32_t fd, uint32_t events) { struct md_input_zeromq_relay *miz = ptr; @@ -49,20 +96,33 @@ static void md_input_zeromq_relay_handle_event(void *ptr, int32_t fd, uint32_t e size_t events_len = sizeof(zmq_events); json_object *zmqh_obj = NULL; const char *json_msg; + struct zmq_connection *zmq_con = lookup(miz->zmq_connections, fd); + void *zmq_socket; - zmq_getsockopt(miz->zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + if (!zmq_con) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Could not lookup ZMQ connection\n"); + return; + } + + zmq_socket = zmq_con->zmq_socket; + if (!zmq_socket) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Not a valid ZMQ socket\n"); + return; + } + + zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); while (zmq_events & ZMQ_POLLIN) { char buf[2048] = {0}; - zmq_recv(miz->zmq_socket, buf, 2048, 0); + zmq_recv(zmq_socket, buf, 2048, 0); json_msg = strchr(buf, '{'); // Sanity checks // Do we even have a json object if (json_msg == NULL) { - zmq_getsockopt(miz->zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); continue; } @@ -70,7 +130,7 @@ static void md_input_zeromq_relay_handle_event(void *ptr, int32_t fd, uint32_t e zmqh_obj = json_tokener_parse(json_msg); if (!zmqh_obj) { META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Received invalid JSON object on ZMQ socket\n"); - zmq_getsockopt(miz->zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); continue; } //TODO: Check so we also have a topic @@ -81,12 +141,13 @@ static void md_input_zeromq_relay_handle_event(void *ptr, int32_t fd, uint32_t e //Yay we have a valid object lets store that and publish it. //Create a zeromq event + memset(miz->mse, 0, sizeof(struct md_zeromq_event)); miz->mse->md_type = META_TYPE_ZEROMQ; - miz->mse->msg = buf; + miz->mse->msg = buf; //This works as next line will call the consuming function sync, could mde_publish_event_obj ever be asynch? mde_publish_event_obj(miz->parent, (struct md_event*) miz->mse); - zmq_getsockopt(miz->zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); + zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); } } @@ -94,63 +155,81 @@ static uint8_t md_input_zeromq_relay_config(struct md_input_zeromq_relay *miz) { int zmq_fd = -1; size_t len = 0; + struct zmq_connection *zmq_con; + miz->zmq_connections = createTable(miz->nr_of_connections); //len of miz->urls // Connect to ZMQ publisher(s) - // first version connects to one publisher, next version should accept multiple publishers - miz->zmq_ctx = zmq_ctx_new(); - if (miz->zmq_ctx == NULL) { - META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't create ZMQ context\n"); - return RETVAL_FAILURE; - } + for (int i = 0; i < miz->nr_of_connections; i++) { + zmq_con = calloc(1, sizeof(struct zmq_connection)); + zmq_con->zmq_ctx = zmq_ctx_new(); + if (zmq_con->zmq_ctx == NULL) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't create ZMQ context\n"); + return RETVAL_FAILURE; + } - miz->zmq_socket = zmq_socket(miz->zmq_ctx, ZMQ_SUB); - if (miz->zmq_socket == NULL) { - META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't create ZMQ socket\n"); - return RETVAL_FAILURE; - } + zmq_con->zmq_socket = zmq_socket(zmq_con->zmq_ctx, ZMQ_SUB); + if (zmq_con->zmq_socket == NULL) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't create ZMQ socket\n"); + return RETVAL_FAILURE; + } - //Connect to user defined publiser $URL - if (zmq_connect(miz->zmq_socket, miz->zmq_pub_url) == -1) - { - META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't connect to %s ZMQ publisher\n",miz->zmq_pub_url ); - return RETVAL_FAILURE; - } + //Connect to user defined publiser $URL + if (zmq_connect(zmq_con->zmq_socket, miz->urls[i]) == -1) + { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't connect to %s ZMQ publisher\n", miz->urls[i]); + return RETVAL_FAILURE; + } - // subscribe to all topics (of this publisher) - const char *topic = ""; - zmq_setsockopt(miz->zmq_socket, ZMQ_SUBSCRIBE, topic, strlen(topic)); - - len = sizeof(zmq_fd); - if (zmq_getsockopt(miz->zmq_socket, ZMQ_FD, &zmq_fd, &len) == -1) { - META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't get ZMQ file descriptor\n"); - return RETVAL_FAILURE; - } + // subscribe to all topics (of this publisher) + const char *topic = ""; + zmq_setsockopt(zmq_con->zmq_socket, ZMQ_SUBSCRIBE, topic, strlen(topic)); + + len = sizeof(zmq_fd); + if (zmq_getsockopt(zmq_con->zmq_socket, ZMQ_FD, &zmq_fd, &len) == -1) { + META_PRINT_SYSLOG(miz->parent, LOG_ERR, "Can't get ZMQ file descriptor\n"); + return RETVAL_FAILURE; + } - if(!(miz->event_handle = backend_create_epoll_handle(miz, - zmq_fd, md_input_zeromq_relay_handle_event))) - return RETVAL_FAILURE; + if(!(miz->event_handle = backend_create_epoll_handle(miz, + zmq_fd, md_input_zeromq_relay_handle_event))) + return RETVAL_FAILURE; - backend_event_loop_update(miz->parent->event_loop, EPOLLIN, EPOLL_CTL_ADD, - zmq_fd, miz->event_handle); + miz->mse = calloc(1, sizeof(struct md_zeromq_event)); + if (miz->mse == NULL) + return RETVAL_FAILURE; + backend_event_loop_update(miz->parent->event_loop, EPOLLIN, EPOLL_CTL_ADD, + zmq_fd, miz->event_handle); + insert(miz->zmq_connections,zmq_fd,zmq_con); + } + return RETVAL_SUCCESS; } static uint8_t md_input_zeromq_relay_init(void *ptr, json_object* config) { struct md_input_zeromq_relay *miz = ptr; - char url[256] = {0}; + miz->nr_of_connections = 0; json_object* subconfig; if (json_object_object_get_ex(config, "zmq_input_relay", &subconfig)) { json_object_object_foreach(subconfig, key, val) { - if (!strcmp(key, "url")) - miz->zmq_pub_url = strncpy(url,json_object_get_string(val), sizeof(url)); + if (!strcmp(key, "urls")) { + miz->nr_of_connections = json_object_array_length(val); + miz->urls= calloc(miz->nr_of_connections, sizeof(char*)); + for (int i=0; i< miz->nr_of_connections; i++) { + struct json_object* json_url = json_object_array_get_idx(val,i); + int url_len = json_object_get_string_len(json_url) + 1; + const char * url = json_object_get_string(json_url); + miz->urls[i] = calloc(url_len, sizeof(char)); + snprintf((char *)miz->urls[i],url_len, "%s", url); + } + } } } - if (!miz->zmq_pub_url) { + if (miz->nr_of_connections <= 0) { META_PRINT_SYSLOG(miz->parent, LOG_ERR, "At least one publisher must be present\n"); return RETVAL_FAILURE; } @@ -160,8 +239,9 @@ static uint8_t md_input_zeromq_relay_init(void *ptr, json_object* config) void md_zeromq_relay_usage() { - fprintf(stderr, "\"zmq_input\": {\t\tZeroMQ input (at least one url must be present)\n"); - fprintf(stderr, " \"url\":\t\tListen to ZeroMQ events on this URL\n"); + fprintf(stderr, "\"zmq_input_relay\": {\tZeroMQ input (at least one url must be present)\n"); + fprintf(stderr, " \"urls\":\t\tArray of ZeroMQ URLs to listen to, \ +eg. [\"tcp://127.0.0.1:10001\", \"tcp://127.0.0.1:10002\"] \n"); fprintf(stderr, "},\n"); } diff --git a/metadata_input_zeromq_relay.h b/metadata_input_zeromq_relay.h index 5a99b2d..f63b39b 100644 --- a/metadata_input_zeromq_relay.h +++ b/metadata_input_zeromq_relay.h @@ -26,16 +26,33 @@ #pragma once #include "metadata_exporter.h" +#include +#include + +struct node{ + int key; //zmq_fd + struct zmq_connection *val; + struct node *next; +}; + +struct table{ + int size; + struct node **list; +}; struct backend_epoll_handle; +struct zmq_connection { + void* zmq_ctx; + void* zmq_socket; +}; + struct md_input_zeromq_relay { MD_INPUT; struct backend_epoll_handle *event_handle; - const char *zmq_pub_url; - void* zmq_ctx; - void* zmq_socket; - int zmq_fd; + struct table *zmq_connections; + const char **urls; + int nr_of_connections; struct md_zeromq_event *mse; }; From 4593f03778ef989c8b7c1fddd7c05a939655c45b Mon Sep 17 00:00:00 2001 From: Thomas Hirsch Date: Mon, 18 Jun 2018 14:43:03 +0200 Subject: [PATCH 6/8] test: merging zeromq-relay-input by jonakarl, adjusting Jenkinsfile accordingly --- Jenkinsfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 0bb7194..779702d 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,6 +1,6 @@ import java.text.SimpleDateFormat jobName = "metadata-exporter" -version = "0.1.50" +version = "0.1.51" build_dir = "build" buildPackageName = "meta-exporter" @@ -35,7 +35,7 @@ node ('dockerslave') { stage ('Build') { dir(build_dir) { - sh "cmake ../metadata-exporter-alt -DNNE=1 -DSQLITE3=1 -DZEROMQ_WRITER=1 -DGPS_NSB=1 -DMUNIN=1 -DSYSEVENT=1 && make && make package" + sh "cmake ../metadata-exporter-alt -DNNE=1 -DSQLITE3=1 -DZEROMQ_WRITER=1 -DZEROMQ_RELAY=1 -DGPS_NSB=1 -DMUNIN=1 -DSYSEVENT=1 && make && make package" } sh "chmod +x versionize/versionize.sh; cp versionize/versionize.sh build/" dir(build_dir) { From 7ef201efc40211f1d800c67957d17161a7b2e9ae Mon Sep 17 00:00:00 2001 From: Thomas Hirsch Date: Mon, 18 Jun 2018 14:58:29 +0200 Subject: [PATCH 7/8] test: ...adjusting Jenkinsfile accordingly --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 779702d..a4f2f75 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -15,7 +15,7 @@ node ('dockerslave') { doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'RelativeTargetDirectory', relativeTargetDir: 'metadata-exporter-alt']], submoduleCfg: [], - userRemoteConfigs: [[url: 'git@github.com:kristrev/data-exporter.git']]]) + userRemoteConfigs: [[url: 'git@github.com:MONROE-PROJECT/data-exporter.git']]]) gitCommit = sh(returnStdout: true, script: 'git rev-parse HEAD').trim() shortCommit = gitCommit.take(6) commitChangeset = sh(returnStdout: true, script: 'git diff-tree --no-commit-id --name-status -r HEAD').trim() From 506be09edd6e869d793e010c13779c225c8b17fa Mon Sep 17 00:00:00 2001 From: Jonas Karlsson Date: Fri, 19 Oct 2018 14:01:19 +0200 Subject: [PATCH 8/8] Fixed one potential memory leak --- metadata_input_zeromq_relay.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/metadata_input_zeromq_relay.c b/metadata_input_zeromq_relay.c index 1eb3158..543da9e 100644 --- a/metadata_input_zeromq_relay.c +++ b/metadata_input_zeromq_relay.c @@ -139,12 +139,12 @@ static void md_input_zeromq_relay_handle_event(void *ptr, int32_t fd, uint32_t e json_object_put(zmqh_obj); //Yay we have a valid object lets store that and publish it. - - //Create a zeromq event + //Create a zeromq event + //This works all work as the consuming function(s) are called sync, could mde_publish_event_obj ever be asynch? memset(miz->mse, 0, sizeof(struct md_zeromq_event)); miz->mse->md_type = META_TYPE_ZEROMQ; - miz->mse->msg = buf; //This works as next line will call the consuming function sync, could mde_publish_event_obj ever be asynch? + miz->mse->msg = buf; //this will fail if next line is async mde_publish_event_obj(miz->parent, (struct md_event*) miz->mse); zmq_getsockopt(zmq_socket, ZMQ_EVENTS, &zmq_events, &events_len); @@ -158,6 +158,11 @@ static uint8_t md_input_zeromq_relay_config(struct md_input_zeromq_relay *miz) struct zmq_connection *zmq_con; miz->zmq_connections = createTable(miz->nr_of_connections); //len of miz->urls + //We only handle one message at a time even if we listen to multiple publishers + miz->mse = calloc(1, sizeof(struct md_zeromq_event)); + if (miz->mse == NULL) + return RETVAL_FAILURE; + // Connect to ZMQ publisher(s) for (int i = 0; i < miz->nr_of_connections; i++) { zmq_con = calloc(1, sizeof(struct zmq_connection)); @@ -194,10 +199,6 @@ static uint8_t md_input_zeromq_relay_config(struct md_input_zeromq_relay *miz) zmq_fd, md_input_zeromq_relay_handle_event))) return RETVAL_FAILURE; - miz->mse = calloc(1, sizeof(struct md_zeromq_event)); - if (miz->mse == NULL) - return RETVAL_FAILURE; - backend_event_loop_update(miz->parent->event_loop, EPOLLIN, EPOLL_CTL_ADD, zmq_fd, miz->event_handle); insert(miz->zmq_connections,zmq_fd,zmq_con);