diff --git a/include/cmetrics/cmt_encode_opentelemetry.h b/include/cmetrics/cmt_encode_opentelemetry.h index 46beca4..811ba36 100644 --- a/include/cmetrics/cmt_encode_opentelemetry.h +++ b/include/cmetrics/cmt_encode_opentelemetry.h @@ -30,15 +30,32 @@ #define CMT_ENCODE_OPENTELEMETRY_INVALID_ARGUMENT_ERROR 2 #define CMT_ENCODE_OPENTELEMETRY_UNEXPECTED_METRIC_TYPE 3 #define CMT_ENCODE_OPENTELEMETRY_DATA_POINT_INIT_ERROR 4 +#define CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR 5 + +#define CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD 300000000000L /* 5 minutes in nanoseconds */ +#define CMT_ENCODE_OPENTELEMETRY_CUTOFF_DISABLED -1L /* disabled */ + struct cmt_opentelemetry_context { size_t resource_index; Opentelemetry__Proto__Metrics__V1__MetricsData *metrics_data; struct cmt *cmt; + int use_cutoff; + int64_t cutoff_threshold; +}; + +struct cmt_opentelemetry_context_opts +{ + int use_cutoff; + int64_t cutoff_threshold; }; cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt); +cfl_sds_t cmt_encode_opentelemetry_create_with_cutoff(struct cmt *cmt, int use_cutoff); +cfl_sds_t cmt_encode_opentelemetry_create_with_cutoff_opts(struct cmt *cmt, + struct cmt_opentelemetry_context_opts *opts); + void cmt_encode_opentelemetry_destroy(cfl_sds_t text); #endif diff --git a/src/cmt_encode_opentelemetry.c b/src/cmt_encode_opentelemetry.c index 8378bc8..0035220 100644 --- a/src/cmt_encode_opentelemetry.c +++ b/src/cmt_encode_opentelemetry.c @@ -257,7 +257,7 @@ static void destroy_opentelemetry_context( struct cmt_opentelemetry_context *context); static struct cmt_opentelemetry_context *initialize_opentelemetry_context( - struct cmt *cmt); + struct cmt *cmt, struct cmt_opentelemetry_context_opts *opts); static inline Opentelemetry__Proto__Common__V1__AnyValue *cfl_variant_to_otlp_any_value(struct cfl_variant *value); static inline Opentelemetry__Proto__Common__V1__KeyValue *cfl_variant_kvpair_to_otlp_kvpair(struct cfl_kvpair *input_pair); @@ -2138,7 +2138,7 @@ static Opentelemetry__Proto__Resource__V1__Resource * } static struct cmt_opentelemetry_context *initialize_opentelemetry_context( - struct cmt *cmt) + struct cmt *cmt, struct cmt_opentelemetry_context_opts *opts) { struct cfl_kvlist *resource_metrics_root; struct cfl_kvlist *scope_metrics_root; @@ -2166,6 +2166,8 @@ static struct cmt_opentelemetry_context *initialize_opentelemetry_context( memset(context, 0, sizeof(struct cmt_opentelemetry_context)); context->cmt = cmt; + context->use_cutoff = opts->use_cutoff; + context->cutoff_threshold = opts->cutoff_threshold; resource = initialize_resource(resource_root, &result); @@ -2369,6 +2371,17 @@ int append_sample_to_metric(struct cmt_opentelemetry_context *context, return result; } +static int check_staled_timestamp(struct cmt_metric *metric, uint64_t now, uint64_t cutoff) +{ + uint64_t ts; + uint64_t diff; + + ts = cmt_metric_get_timestamp(metric); + diff = now - ts; + + return diff > cutoff; +} + int pack_basic_type(struct cmt_opentelemetry_context *context, struct cmt_map *map, size_t *metric_index) @@ -2382,8 +2395,11 @@ int pack_basic_type(struct cmt_opentelemetry_context *context, Opentelemetry__Proto__Metrics__V1__Metric *metric; int result; struct cfl_list *head; + uint64_t now; + int cutoff = CMT_FALSE; sample_count = 0; + now = cfl_time_now(); if (map->metric_static_set) { sample_count++; @@ -2434,6 +2450,15 @@ int pack_basic_type(struct cmt_opentelemetry_context *context, &map->metric, sample_index++); + if (context->use_cutoff == CMT_TRUE && + check_staled_timestamp(&map->metric, now, + context->cutoff_threshold)) { + destroy_metric(metric); + + /* Skip processing metrics which are staled over the threshold */ + return CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR; + } + if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) { destroy_metric(metric); @@ -2444,6 +2469,15 @@ int pack_basic_type(struct cmt_opentelemetry_context *context, cfl_list_foreach(head, &map->metrics) { sample = cfl_list_entry(head, struct cmt_metric, _head); + if (context->use_cutoff == CMT_TRUE && + check_staled_timestamp(&map->metric, now, + context->cutoff_threshold)) { + + /* Skip processing metrics which are staled over the threshold */ + cutoff = CMT_TRUE; + continue; + } + result = append_sample_to_metric(context, metric, map, @@ -2473,6 +2507,10 @@ int pack_basic_type(struct cmt_opentelemetry_context *context, (*metric_index)++; + if (cutoff == CMT_TRUE) { + return CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR; + } + return result; } @@ -2498,7 +2536,8 @@ static cfl_sds_t render_opentelemetry_context_to_sds( return result_buffer; } -cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) +cfl_sds_t cmt_encode_opentelemetry_create_with_cutoff_opts(struct cmt *cmt, + struct cmt_opentelemetry_context_opts *opts) { size_t metric_index; struct cmt_opentelemetry_context *context; @@ -2514,7 +2553,7 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) buf = NULL; result = 0; - context = initialize_opentelemetry_context(cmt); + context = initialize_opentelemetry_context(cmt, opts); if (context == NULL) { return NULL; @@ -2527,6 +2566,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) counter = cfl_list_entry(head, struct cmt_counter, _head); result = pack_basic_type(context, counter->map, &metric_index); + if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) { break; } @@ -2538,6 +2581,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) gauge = cfl_list_entry(head, struct cmt_gauge, _head); result = pack_basic_type(context, gauge->map, &metric_index); + if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) { break; } @@ -2549,6 +2596,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) untyped = cfl_list_entry(head, struct cmt_untyped, _head); result = pack_basic_type(context, untyped->map, &metric_index); + if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) { break; } @@ -2560,6 +2611,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) summary = cfl_list_entry(head, struct cmt_summary, _head); result = pack_basic_type(context, summary->map, &metric_index); + if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) { break; } @@ -2571,14 +2626,20 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) histogram = cfl_list_entry(head, struct cmt_histogram, _head); result = pack_basic_type(context, histogram->map, &metric_index); + if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) { + continue; + } + if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) { break; } } } - if (result == CMT_ENCODE_OPENTELEMETRY_SUCCESS) { + if (metric_index > 0) { buf = render_opentelemetry_context_to_sds(context); + } else { + buf = NULL; } destroy_opentelemetry_context(context); @@ -2586,6 +2647,24 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) return buf; } +cfl_sds_t cmt_encode_opentelemetry_create_with_cutoff(struct cmt *cmt, int use_cutoff) +{ + struct cmt_opentelemetry_context_opts opts; + opts.use_cutoff = use_cutoff; + opts.cutoff_threshold = CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD; + + return cmt_encode_opentelemetry_create_with_cutoff_opts(cmt, &opts); +} + +cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt) +{ + struct cmt_opentelemetry_context_opts opts; + opts.use_cutoff = CMT_FALSE; + opts.cutoff_threshold = CMT_ENCODE_OPENTELEMETRY_CUTOFF_DISABLED; + + return cmt_encode_opentelemetry_create_with_cutoff_opts(cmt, &opts); +} + void cmt_encode_opentelemetry_destroy(cfl_sds_t text) { cfl_sds_destroy(text); diff --git a/tests/decoding.c b/tests/decoding.c index 6c70811..7043797 100644 --- a/tests/decoding.c +++ b/tests/decoding.c @@ -32,19 +32,17 @@ #include "cmt_tests.h" -static struct cmt *generate_encoder_test_data() +static struct cmt *generate_encoder_test_data_with_timestamp(uint64_t ts) { double quantiles[5]; struct cmt_histogram_buckets *buckets; double val; struct cmt *cmt; - uint64_t ts; struct cmt_gauge *g1; struct cmt_counter *c1; struct cmt_summary *s1; struct cmt_histogram *h1; - ts = 0; cmt = cmt_create(); c1 = cmt_counter_create(cmt, "kubernetes", "network", "load_counter", "Network load counter", @@ -124,6 +122,14 @@ static struct cmt *generate_encoder_test_data() return cmt; } +static struct cmt *generate_encoder_test_data_now() +{ + uint64_t ts = 0; + ts = cfl_time_now(); + + return generate_encoder_test_data_with_timestamp(ts); +} + void test_opentelemetry() { cfl_sds_t reference_prometheus_context; @@ -139,7 +145,7 @@ void test_opentelemetry() cmt_initialize(); - cmt = generate_encoder_test_data(); + cmt = generate_encoder_test_data_now(); TEST_CHECK(cmt != NULL); reference_prometheus_context = cmt_encode_prometheus_create(cmt, CMT_TRUE); diff --git a/tests/encoding.c b/tests/encoding.c index b962a46..cdcc747 100644 --- a/tests/encoding.c +++ b/tests/encoding.c @@ -583,10 +583,12 @@ void test_opentelemetry() cfl_sds_t payload; struct cmt *cmt; FILE *sample_file; + uint64_t ts; cmt_initialize(); + ts = cfl_time_now(); - cmt = generate_encoder_test_data(); + cmt = generate_encoder_test_data_with_timestamp(ts); payload = cmt_encode_opentelemetry_create(cmt); TEST_CHECK(NULL != payload); @@ -609,7 +611,49 @@ curl -v 'http://localhost:9090/v1/metrics' -H 'Content-Type: application/x-proto fclose(sample_file); - cmt_encode_prometheus_remote_write_destroy(payload); + cmt_encode_opentelemetry_destroy(payload); + + cmt_destroy(cmt); +} + +void test_opentelemetry_outdated() +{ + cfl_sds_t payload; + struct cmt *cmt; + uint64_t ts; + + cmt_initialize(); + ts = cfl_time_now() - CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD * 1.5; + + cmt = generate_encoder_test_data_with_timestamp(ts); + + payload = cmt_encode_opentelemetry_create_with_cutoff(cmt, CMT_TRUE); + TEST_CHECK(NULL == payload); + + cmt_encode_opentelemetry_destroy(payload); + + cmt_destroy(cmt); +} + +void test_opentelemetry_outdated_with_cutoff_opts() +{ + cfl_sds_t payload; + struct cmt *cmt; + uint64_t ts; + struct cmt_opentelemetry_context_opts opts; + + opts.use_cutoff = CMT_TRUE; + opts.cutoff_threshold = CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD; + + cmt_initialize(); + ts = cfl_time_now() - CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD * 1.5; + + cmt = generate_encoder_test_data_with_timestamp(ts); + + payload = cmt_encode_opentelemetry_create_with_cutoff_opts(cmt, &opts); + TEST_CHECK(NULL == payload); + + cmt_encode_opentelemetry_destroy(payload); cmt_destroy(cmt); } @@ -1173,6 +1217,8 @@ TEST_LIST = { {"cmt_msgpack_labels", test_cmt_to_msgpack_labels}, {"cmt_msgpack", test_cmt_to_msgpack}, {"opentelemetry", test_opentelemetry}, + {"opentelemetry_old_context", test_opentelemetry_outdated}, + {"opentelemetry_cutoff_opts", test_opentelemetry_outdated_with_cutoff_opts}, {"cloudwatch_emf", test_cloudwatch_emf}, {"prometheus", test_prometheus}, {"text", test_text},