Skip to content

Commit c4e5e8b

Browse files
committed
pipeline: outputs: es: custom types to track ownership of plugin configuration data
Signed-off-by: Marat Abrarov <[email protected]>
1 parent 5161df1 commit c4e5e8b

File tree

9 files changed

+904
-331
lines changed

9 files changed

+904
-331
lines changed

plugins/out_es/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
set(src
22
es_bulk.c
3+
es_type.c
34
es_conf_parse.c
45
es_conf.c
56
es.c

plugins/out_es/es.c

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ static flb_sds_t add_aws_auth(struct flb_http_client *c,
6969

7070
signature = flb_signv4_do(c, FLB_TRUE, FLB_TRUE, time(NULL),
7171
ec->aws_region, ec->aws_service_name,
72-
S3_MODE_SIGNED_PAYLOAD, ec->aws_unsigned_headers,
73-
ec->aws_provider);
72+
S3_MODE_SIGNED_PAYLOAD,
73+
ec->aws_unsigned_headers.value,
74+
ec->aws_provider.value);
7475
if (!signature) {
7576
flb_plg_error(ctx->ins, "could not sign request with sigv4");
7677
return NULL;
@@ -214,15 +215,15 @@ static flb_sds_t es_get_id_value(struct flb_elasticsearch *ctx,
214215
{
215216
struct flb_ra_value *rval = NULL;
216217
flb_sds_t tmp_str;
217-
rval = flb_ra_get_value_object(ec->ra_id_key, *map);
218+
rval = flb_ra_get_value_object(ec->ra_id_key.value, *map);
218219
if (rval == NULL) {
219220
flb_plg_warn(ctx->ins, "the value of %s is missing",
220-
ec->id_key);
221+
ec->id_key.value);
221222
return NULL;
222223
}
223224
else if(rval->o.type != MSGPACK_OBJECT_STR) {
224225
flb_plg_warn(ctx->ins, "the value of %s is not string",
225-
ec->id_key);
226+
ec->id_key.value);
226227
flb_ra_key_value_destroy(rval);
227228
return NULL;
228229
}
@@ -253,7 +254,7 @@ static int compose_index_header(struct flb_elasticsearch_config *ec,
253254
if (es_index_custom_len > 0) {
254255
p = logstash_index + es_index_custom_len;
255256
} else {
256-
p = logstash_index + flb_sds_len(ec->logstash_prefix);
257+
p = logstash_index + flb_sds_len(ec->logstash_prefix.value);
257258
}
258259
len = p - logstash_index;
259260
ret = snprintf(p, logstash_index_size - len, "%s",
@@ -266,7 +267,7 @@ static int compose_index_header(struct flb_elasticsearch_config *ec,
266267
len += strlen(separator_str);
267268

268269
s = strftime(p, logstash_index_size - len,
269-
ec->logstash_dateformat, tm);
270+
ec->logstash_dateformat.value, tm);
270271
if (s==0) {
271272
/* exceed limit */
272273
return -1;
@@ -351,7 +352,7 @@ static int elasticsearch_format(struct flb_config *config,
351352

352353
/* Copy logstash prefix if logstash format is enabled */
353354
if (ec->logstash_format == FLB_TRUE) {
354-
strncpy(logstash_index, ec->logstash_prefix, sizeof(logstash_index));
355+
strncpy(logstash_index, ec->logstash_prefix.value, sizeof(logstash_index));
355356
logstash_index[sizeof(logstash_index) - 1] = '\0';
356357
}
357358

@@ -366,7 +367,7 @@ static int elasticsearch_format(struct flb_config *config,
366367
flb_time_get(&tms);
367368
gmtime_r(&tms.tm.tv_sec, &tm);
368369
strftime(index_formatted, sizeof(index_formatted) - 1,
369-
ec->index, &tm);
370+
ec->index.value, &tm);
370371
es_index = index_formatted;
371372
if (ec->suppress_type_name) {
372373
index_len = flb_sds_snprintf(&j_index,
@@ -380,7 +381,7 @@ static int elasticsearch_format(struct flb_config *config,
380381
flb_sds_alloc(j_index),
381382
ES_BULK_INDEX_FMT,
382383
ec->es_action,
383-
es_index, ec->type);
384+
es_index, ec->type.value);
384385
}
385386
}
386387

@@ -408,8 +409,8 @@ static int elasticsearch_format(struct flb_config *config,
408409
map_size = map.via.map.size;
409410

410411
es_index_custom_len = 0;
411-
if (ec->logstash_prefix_key) {
412-
flb_sds_t v = flb_ra_translate(ec->ra_prefix_key,
412+
if (ec->logstash_prefix_key.value) {
413+
flb_sds_t v = flb_ra_translate(ec->ra_prefix_key.value,
413414
(char *) tag, tag_len,
414415
map, NULL);
415416
if (v) {
@@ -438,13 +439,14 @@ static int elasticsearch_format(struct flb_config *config,
438439
msgpack_pack_map(&tmp_pck, map_size + 1);
439440

440441
/* Append the time key */
441-
msgpack_pack_str(&tmp_pck, flb_sds_len(ec->time_key));
442-
msgpack_pack_str_body(&tmp_pck, ec->time_key, flb_sds_len(ec->time_key));
442+
msgpack_pack_str(&tmp_pck, flb_sds_len(ec->time_key.value));
443+
msgpack_pack_str_body(&tmp_pck, ec->time_key.value,
444+
flb_sds_len(ec->time_key.value));
443445

444446
/* Format the time */
445447
gmtime_r(&tms.tm.tv_sec, &tm);
446448
s = strftime(time_formatted, sizeof(time_formatted) - 1,
447-
ec->time_key_format, &tm);
449+
ec->time_key_format.value, &tm);
448450
if (ec->time_key_nanos) {
449451
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
450452
".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
@@ -458,11 +460,11 @@ static int elasticsearch_format(struct flb_config *config,
458460
msgpack_pack_str(&tmp_pck, s);
459461
msgpack_pack_str_body(&tmp_pck, time_formatted, s);
460462

461-
es_index = ec->index;
463+
es_index = ec->index.value;
462464
if (ec->logstash_format == FLB_TRUE) {
463465
ret = compose_index_header(ec, es_index_custom_len,
464466
&logstash_index[0], sizeof(logstash_index),
465-
ec->logstash_prefix_separator, &tm);
467+
ec->logstash_prefix_separator.value, &tm);
466468
if (ret < 0) {
467469
/* retry with default separator */
468470
compose_index_header(ec, es_index_custom_len,
@@ -484,21 +486,22 @@ static int elasticsearch_format(struct flb_config *config,
484486
flb_sds_alloc(j_index),
485487
ES_BULK_INDEX_FMT,
486488
ec->es_action,
487-
es_index, ec->type);
489+
es_index, ec->type.value);
488490
}
489491
}
490492
}
491493
else if (ec->current_time_index == FLB_TRUE) {
492494
/* Make sure we handle index time format for index */
493495
strftime(index_formatted, sizeof(index_formatted) - 1,
494-
ec->index, &tm);
496+
ec->index.value, &tm);
495497
es_index = index_formatted;
496498
}
497499

498500
/* Tag Key */
499501
if (ec->include_tag_key == FLB_TRUE) {
500-
msgpack_pack_str(&tmp_pck, flb_sds_len(ec->tag_key));
501-
msgpack_pack_str_body(&tmp_pck, ec->tag_key, flb_sds_len(ec->tag_key));
502+
msgpack_pack_str(&tmp_pck, flb_sds_len(ec->tag_key.value));
503+
msgpack_pack_str_body(&tmp_pck, ec->tag_key.value,
504+
flb_sds_len(ec->tag_key.value));
502505
msgpack_pack_str(&tmp_pck, tag_len);
503506
msgpack_pack_str_body(&tmp_pck, tag, tag_len);
504507
}
@@ -537,10 +540,10 @@ static int elasticsearch_format(struct flb_config *config,
537540
flb_sds_alloc(j_index),
538541
ES_BULK_INDEX_FMT_ID,
539542
ec->es_action,
540-
es_index, ec->type, es_uuid);
543+
es_index, ec->type.value, es_uuid);
541544
}
542545
}
543-
if (ec->ra_id_key) {
546+
if (ec->ra_id_key.value) {
544547
id_key_str = es_get_id_value(ctx, ec, &map);
545548
if (id_key_str) {
546549
if (ec->suppress_type_name) {
@@ -555,7 +558,8 @@ static int elasticsearch_format(struct flb_config *config,
555558
flb_sds_alloc(j_index),
556559
ES_BULK_INDEX_FMT_ID,
557560
ec->es_action,
558-
es_index, ec->type, id_key_str);
561+
es_index, ec->type.value,
562+
id_key_str);
559563
}
560564
flb_sds_destroy(id_key_str);
561565
id_key_str = NULL;
@@ -574,13 +578,13 @@ static int elasticsearch_format(struct flb_config *config,
574578
}
575579

576580
out_buf_len = flb_sds_len(out_buf);
577-
if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_UPDATE) == 0) {
581+
if (strcasecmp(ec->write_operation.value, FLB_ES_WRITE_OP_UPDATE) == 0) {
578582
tmp_buf = out_buf;
579583
out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPDATE_OP_BODY) - 2);
580584
out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPDATE_OP_BODY, tmp_buf);
581585
flb_sds_destroy(tmp_buf);
582586
}
583-
else if (strcasecmp(ec->write_operation, FLB_ES_WRITE_OP_UPSERT) == 0) {
587+
else if (strcasecmp(ec->write_operation.value, FLB_ES_WRITE_OP_UPSERT) == 0) {
584588
tmp_buf = out_buf;
585589
out_buf = flb_sds_create_len(NULL, out_buf_len = out_buf_len + sizeof(ES_BULK_UPSERT_OP_BODY) - 2);
586590
out_buf_len = snprintf(out_buf, out_buf_len, ES_BULK_UPSERT_OP_BODY, tmp_buf);
@@ -929,8 +933,8 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
929933
if (ec->http_user && ec->http_passwd) {
930934
flb_http_basic_auth(c, ec->http_user, ec->http_passwd);
931935
}
932-
else if (ec->cloud_user && ec->cloud_passwd) {
933-
flb_http_basic_auth(c, ec->cloud_user, ec->cloud_passwd);
936+
else if (ec->cloud_user.value && ec->cloud_passwd.value) {
937+
flb_http_basic_auth(c, ec->cloud_user.value, ec->cloud_passwd.value);
934938
}
935939
else if (ec->http_api_key) {
936940
/* 7 for ApiKey + space */
@@ -1146,12 +1150,14 @@ static int cb_es_exit(void *data, struct flb_config *config)
11461150
static struct flb_config_map config_map[] = {
11471151
{
11481152
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_INDEX, FLB_ES_DEFAULT_INDEX,
1149-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index),
1153+
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index) +
1154+
offsetof(struct flb_es_str, value),
11501155
"Set an index name"
11511156
},
11521157
{
11531158
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TYPE, FLB_ES_DEFAULT_TYPE,
1154-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type),
1159+
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type) +
1160+
offsetof(struct flb_es_str, value),
11551161
"Set the document type property"
11561162
},
11571163
{
@@ -1244,41 +1250,50 @@ static struct flb_config_map config_map[] = {
12441250
},
12451251
{
12461252
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, FLB_ES_DEFAULT_PREFIX,
1247-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix),
1253+
0, FLB_TRUE,
1254+
offsetof(struct flb_elasticsearch_config, logstash_prefix) +
1255+
offsetof(struct flb_es_sds_t, value),
12481256
"When Logstash_Format is enabled, the Index name is composed using a prefix "
12491257
"and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
12501258
"become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date "
12511259
"when the data is being generated"
12521260
},
12531261
{
12541262
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR, "-",
1255-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_separator),
1263+
0, FLB_TRUE,
1264+
offsetof(struct flb_elasticsearch_config, logstash_prefix_separator) +
1265+
offsetof(struct flb_es_sds_t, value),
12561266
"Set a separator between logstash_prefix and date."
12571267
},
12581268
{
12591269
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY, NULL,
1260-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_key),
1270+
0, FLB_TRUE,
1271+
offsetof(struct flb_elasticsearch_config, logstash_prefix_key) +
1272+
offsetof(struct flb_es_sds_t, value),
12611273
"When included: the value in the record that belongs to the key will be looked "
12621274
"up and over-write the Logstash_Prefix for index generation. If the key/value "
12631275
"is not found in the record then the Logstash_Prefix option will act as a "
12641276
"fallback. Nested keys are supported through record accessor pattern"
12651277
},
12661278
{
12671279
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT, FLB_ES_DEFAULT_TIME_FMT,
1268-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat),
1280+
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat) +
1281+
offsetof(struct flb_es_sds_t, value),
12691282
"Time format (based on strftime) to generate the second part of the Index name"
12701283
},
12711284

12721285
/* Custom Time and Tag keys */
12731286
{
12741287
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY, FLB_ES_DEFAULT_TIME_KEY,
1275-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key),
1288+
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key) +
1289+
offsetof(struct flb_es_sds_t, value),
12761290
"When Logstash_Format is enabled, each record will get a new timestamp field. "
12771291
"The Time_Key property defines the name of that field"
12781292
},
12791293
{
12801294
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, FLB_ES_DEFAULT_TIME_KEYF,
1281-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format),
1295+
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format) +
1296+
offsetof(struct flb_es_sds_t, value),
12821297
"When Logstash_Format is enabled, this property defines the format of the "
12831298
"timestamp"
12841299
},
@@ -1295,7 +1310,8 @@ static struct flb_config_map config_map[] = {
12951310
},
12961311
{
12971312
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TAG_KEY, FLB_ES_DEFAULT_TAG_KEY,
1298-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key),
1313+
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key) +
1314+
offsetof(struct flb_es_sds_t, value),
12991315
"When Include_Tag_Key is enabled, this property defines the key name for the tag"
13001316
},
13011317
{
@@ -1333,12 +1349,14 @@ static struct flb_config_map config_map[] = {
13331349
},
13341350
{
13351351
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, "create",
1336-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, write_operation),
1352+
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, write_operation) +
1353+
offsetof(struct flb_es_sds_t, value),
13371354
"Operation to use to write in bulk requests"
13381355
},
13391356
{
13401357
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_ID_KEY, NULL,
1341-
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, id_key),
1358+
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, id_key) +
1359+
offsetof(struct flb_es_sds_t, value),
13421360
"If set, _id will be the value of the key from incoming record."
13431361
},
13441362
{

0 commit comments

Comments
 (0)