diff --git a/plugins/out_azure_kusto/CMakeLists.txt b/plugins/out_azure_kusto/CMakeLists.txt index 6803bee09c2..a19d505009c 100644 --- a/plugins/out_azure_kusto/CMakeLists.txt +++ b/plugins/out_azure_kusto/CMakeLists.txt @@ -2,6 +2,7 @@ set(src azure_kusto.c azure_kusto_conf.c azure_kusto_ingest.c + azure_msiauth.c ) FLB_PLUGIN(out_azure_kusto "${src}" "") diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 4b79d80c4bb..ed31dd8d5bd 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -29,6 +29,21 @@ #include "azure_kusto.h" #include "azure_kusto_conf.h" #include "azure_kusto_ingest.h" +#include "azure_msiauth.h" + +static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx) +{ + char *token; + + /* Retrieve access token */ + token = flb_azure_msiauth_token_get(ctx->o); + if (!token) { + flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); + return -1; + } + + return 0; +} /* Create a new oauth2 context and get a oauth2 token */ static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx) @@ -84,9 +99,14 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) } if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) { - ret = azure_kusto_get_oauth2_token(ctx); + if (ctx->managed_identity_client_id != NULL) { + ret = azure_kusto_get_msi_token(ctx); + } + else { + ret = azure_kusto_get_oauth2_token(ctx); + } } - + /* Copy string to prevent race conditions (get_oauth2 can free the string) */ if (ret == 0) { output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) + @@ -483,6 +503,11 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, client_secret), "Set the client secret (Application Password) of the AAD application used for " "authentication"}, + {FLB_CONFIG_MAP_STR, "managed_identity_client_id", (char *)NULL, 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, managed_identity_client_id), + "A managed identity client id to authenticate with. " + "Set to 'system' for system-assigned managed identity. " + "Set the MI client ID (GUID) for user-assigned managed identity."}, {FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_endpoint), "Set the Kusto cluster's ingestion endpoint URL (e.g. " diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 9e3eb7b3182..7b59df893b0 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -68,6 +68,7 @@ struct flb_azure_kusto { flb_sds_t tenant_id; flb_sds_t client_id; flb_sds_t client_secret; + flb_sds_t managed_identity_client_id; flb_sds_t ingestion_endpoint; flb_sds_t database_name; flb_sds_t table_name; diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index d25f11b15c3..8fda0cce981 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -30,6 +30,7 @@ #include "azure_kusto.h" #include "azure_kusto_conf.h" +#include "azure_msiauth.h" static struct flb_upstream_node *flb_upstream_node_create_url(struct flb_azure_kusto *ctx, struct flb_config *config, @@ -601,23 +602,8 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } - /* config: 'tenant_id' */ - if (ctx->tenant_id == NULL) { - flb_plg_error(ctx->ins, "property 'tenant_id' is not defined."); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'client_id' */ - if (ctx->client_id == NULL) { - flb_plg_error(ctx->ins, "property 'client_id' is not defined"); - flb_azure_kusto_conf_destroy(ctx); - return NULL; - } - - /* config: 'client_secret' */ - if (ctx->client_secret == NULL) { - flb_plg_error(ctx->ins, "property 'client_secret' is not defined"); + if (ctx->tenant_id == NULL && ctx->client_id == NULL && ctx->client_secret == NULL && ctx->managed_identity_client_id == NULL) { + flb_plg_error(ctx->ins, "Service Principal or Managed Identity is not defined"); flb_azure_kusto_conf_destroy(ctx); return NULL; } @@ -643,17 +629,70 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } - /* Create the auth URL */ - ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 + - flb_sds_len(ctx->tenant_id)); - if (!ctx->oauth_url) { - flb_errno(); - flb_azure_kusto_conf_destroy(ctx); - return NULL; + if (ctx->managed_identity_client_id != NULL) { + /* system assigned managed identity */ + if (strcasecmp(ctx->managed_identity_client_id, "system") == 0) { + ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1); + + if (!ctx->oauth_url) { + flb_errno(); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), + FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", ""); + + } else { + /* user assigned managed identity */ + ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 + + sizeof("&client_id=") - 1 + + flb_sds_len(ctx->managed_identity_client_id)); + + if (!ctx->oauth_url) { + flb_errno(); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), + FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->managed_identity_client_id); + } } - flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), - FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id); + else { + /* config: 'tenant_id' */ + if (ctx->tenant_id == NULL) { + flb_plg_error(ctx->ins, "property 'tenant_id' is not defined."); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* config: 'client_id' */ + if (ctx->client_id == NULL) { + flb_plg_error(ctx->ins, "property 'client_id' is not defined"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* config: 'client_secret' */ + if (ctx->client_secret == NULL) { + flb_plg_error(ctx->ins, "property 'client_secret' is not defined"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + /* Create the auth URL */ + ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 + + flb_sds_len(ctx->tenant_id)); + if (!ctx->oauth_url) { + flb_errno(); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url), + FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id); + } + ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources)); if (!ctx->resources) { flb_errno(); diff --git a/plugins/out_azure_kusto/azure_msiauth.c b/plugins/out_azure_kusto/azure_msiauth.c new file mode 100644 index 00000000000..934e0d6f686 --- /dev/null +++ b/plugins/out_azure_kusto/azure_msiauth.c @@ -0,0 +1,104 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "azure_msiauth.h" + +char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx) + { + int ret; + size_t b_sent; + time_t now; + struct flb_connection *u_conn; + struct flb_http_client *c; + + now = time(NULL); + if (ctx->access_token) { + /* validate unexpired token */ + if (ctx->expires > now && flb_sds_len(ctx->access_token) > 0) { + return ctx->access_token; + } + } + + /* Get Token and store it in the context */ + u_conn = flb_upstream_conn_get(ctx->u); + if (!u_conn) { + flb_error("[azure msi auth] could not get an upstream connection to %s:%i", + ctx->u->tcp_host, ctx->u->tcp_port); + return NULL; + } + + /* Create HTTP client context */ + c = flb_http_client(u_conn, FLB_HTTP_GET, ctx->uri, + NULL, 0, + ctx->host, atoi(ctx->port), + NULL, 0); + if (!c) { + flb_error("[azure msi auth] error creating HTTP client context"); + flb_upstream_conn_release(u_conn); + return NULL; + } + + /* Append HTTP Header */ + flb_http_add_header(c, "Metadata", 8, "true", 4); + + /* Issue request */ + ret = flb_http_do(c, &b_sent); + if (ret != 0) { + flb_warn("[azure msi auth] cannot issue request, http_do=%i", ret); + } + else { + flb_info("[azure msi auth] HTTP Status=%i", c->resp.status); + if (c->resp.payload_size > 0) { + if (c->resp.status == 200) { + flb_debug("[azure msi auth] payload:\n%s", c->resp.payload); + } + else { + flb_info("[azure msi auth] payload:\n%s", c->resp.payload); + } + } + } + + /* Extract token */ + if (c->resp.payload_size > 0 && c->resp.status == 200) { + ret = flb_oauth2_parse_json_response(c->resp.payload, + c->resp.payload_size, ctx); + if (ret == 0) { + flb_info("[azure msi auth] access token from '%s:%s' retrieved", + ctx->host, ctx->port); + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + ctx->issued = time(NULL); + ctx->expires = ctx->issued + ctx->expires_in; + return ctx->access_token; + } + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + + return NULL; + } diff --git a/plugins/out_azure_kusto/azure_msiauth.h b/plugins/out_azure_kusto/azure_msiauth.h new file mode 100644 index 00000000000..64c4ca8d8b3 --- /dev/null +++ b/plugins/out_azure_kusto/azure_msiauth.h @@ -0,0 +1,27 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +/* MSAL authorization URL */ +#define FLB_AZURE_MSIAUTH_URL_TEMPLATE \ + "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2021-02-01%s%s&resource=https://api.kusto.windows.net" + +char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx); + diff --git a/src/flb_oauth2.c b/src/flb_oauth2.c index 12090c3257f..866fbe8e132 100644 --- a/src/flb_oauth2.c +++ b/src/flb_oauth2.c @@ -179,7 +179,7 @@ struct flb_oauth2 *flb_oauth2_create(struct flb_config *config, goto error; } - if (!prot || strcmp(prot, "https") != 0) { + if (!prot || (strcmp(prot, "https") != 0 && strcmp(prot, "http") != 0)) { flb_error("[oauth2] invalid endpoint protocol: %s", auth_url); goto error; } @@ -227,8 +227,15 @@ struct flb_oauth2 *flb_oauth2_create(struct flb_config *config, } /* Create Upstream context */ - ctx->u = flb_upstream_create_url(config, auth_url, - FLB_IO_TLS, ctx->tls); + if (strcmp(prot, "https") == 0) { + ctx->u = flb_upstream_create_url(config, auth_url, + FLB_IO_TLS, ctx->tls); + } + else if (strcmp(prot, "http") == 0) { + ctx->u = flb_upstream_create_url(config, auth_url, + FLB_IO_TCP, NULL); + } + if (!ctx->u) { flb_error("[oauth2] error creating upstream context"); goto error; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 49ed0e0cf5d..ecbe38322ba 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -199,6 +199,7 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_LIB "core_log.c") FLB_RT_TEST(FLB_OUT_LIB "config_map_opts.c") FLB_RT_TEST(FLB_OUT_COUNTER "out_counter.c") + FLB_RT_TEST(FLB_OUT_AZURE_KUSTO "out_azure_kusto.c") FLB_RT_TEST(FLB_OUT_DATADOG "out_datadog.c") FLB_RT_TEST(FLB_OUT_SKYWALKING "out_skywalking.c") FLB_RT_TEST(FLB_OUT_ES "out_elasticsearch.c") diff --git a/tests/runtime/out_azure_kusto.c b/tests/runtime/out_azure_kusto.c new file mode 100644 index 00000000000..dc4efe41d35 --- /dev/null +++ b/tests/runtime/out_azure_kusto.c @@ -0,0 +1,76 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2019-2022 The Fluent Bit Authors + * Copyright (C) 2015-2018 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "flb_tests_runtime.h" + +/* Test data */ +#include "data/common/json_invalid.h" /* JSON_INVALID */ + +/* Test functions */ +void flb_test_azure_kusto_json_invalid(void); + +/* Test list */ +TEST_LIST = { + {"json_invalid", flb_test_azure_kusto_json_invalid }, + {NULL, NULL} +}; + +void flb_test_azure_kusto_json_invalid(void) +{ + int i; + int ret; + int total; + int bytes; + char *p = (char *) JSON_INVALID; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + + ctx = flb_create(); + flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "azure_kusto", NULL); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, "match", "test", NULL); + flb_output_set(ctx, out_ffd, "managed_identity_client_id", "SYSTEM", NULL); + flb_output_set(ctx, out_ffd, "ingestion_endpoint", "https://ingest-CLUSTER.kusto.windows.net", NULL); + flb_output_set(ctx, out_ffd, "database_name", "telemetrydb", NULL); + flb_output_set(ctx, out_ffd, "table_name", "logs", NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + total = 0; + for (i = 0; i < (int) sizeof(JSON_INVALID) - 1; i++) { + bytes = flb_lib_push(ctx, in_ffd, p + i, 1); + TEST_CHECK(bytes == 1); + total++; + } + + sleep(1); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); +} \ No newline at end of file