Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_azure_kusto: azure managed identity support added #10036

Merged
merged 4 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/out_azure_kusto/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ set(src
azure_kusto.c
azure_kusto_conf.c
azure_kusto_ingest.c
azure_msiauth.c
)

FLB_PLUGIN(out_azure_kusto "${src}" "")
29 changes: 27 additions & 2 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@
#include "azure_kusto.h"
#include "azure_kusto_conf.h"
#include "azure_kusto_ingest.h"
#include "azure_msiauth.h"

static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx)
{
char *token;

/* Retrieve access token */
token = flb_azure_msiauth_token_get(ctx->o);
if (!token) {
flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
return -1;
}

return 0;
}

/* Create a new oauth2 context and get a oauth2 token */
static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx)
Expand Down Expand Up @@ -84,9 +99,14 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
}

if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
ret = azure_kusto_get_oauth2_token(ctx);
if (ctx->managed_identity_client_id != NULL) {
ret = azure_kusto_get_msi_token(ctx);
}
else {
ret = azure_kusto_get_oauth2_token(ctx);
}
}

/* Copy string to prevent race conditions (get_oauth2 can free the string) */
if (ret == 0) {
output = flb_sds_create_size(flb_sds_len(ctx->o->token_type) +
Expand Down Expand Up @@ -483,6 +503,11 @@ static struct flb_config_map config_map[] = {
offsetof(struct flb_azure_kusto, client_secret),
"Set the client secret (Application Password) of the AAD application used for "
"authentication"},
{FLB_CONFIG_MAP_STR, "managed_identity_client_id", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, managed_identity_client_id),
"A managed identity client id to authenticate with. "
"Set to 'system' for system-assigned managed identity. "
"Set the MI client ID (GUID) for user-assigned managed identity."},
{FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint),
"Set the Kusto cluster's ingestion endpoint URL (e.g. "
Expand Down
1 change: 1 addition & 0 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct flb_azure_kusto {
flb_sds_t tenant_id;
flb_sds_t client_id;
flb_sds_t client_secret;
flb_sds_t managed_identity_client_id;
flb_sds_t ingestion_endpoint;
flb_sds_t database_name;
flb_sds_t table_name;
Expand Down
91 changes: 65 additions & 26 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "azure_kusto.h"
#include "azure_kusto_conf.h"
#include "azure_msiauth.h"

static struct flb_upstream_node *flb_upstream_node_create_url(struct flb_azure_kusto *ctx,
struct flb_config *config,
Expand Down Expand Up @@ -601,23 +602,8 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}

/* config: 'tenant_id' */
if (ctx->tenant_id == NULL) {
flb_plg_error(ctx->ins, "property 'tenant_id' is not defined.");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* config: 'client_id' */
if (ctx->client_id == NULL) {
flb_plg_error(ctx->ins, "property 'client_id' is not defined");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* config: 'client_secret' */
if (ctx->client_secret == NULL) {
flb_plg_error(ctx->ins, "property 'client_secret' is not defined");
if (ctx->tenant_id == NULL && ctx->client_id == NULL && ctx->client_secret == NULL && ctx->managed_identity_client_id == NULL) {
flb_plg_error(ctx->ins, "Service Principal or Managed Identity is not defined");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
Expand All @@ -643,17 +629,70 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}

/* Create the auth URL */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 +
flb_sds_len(ctx->tenant_id));
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
if (ctx->managed_identity_client_id != NULL) {
/* system assigned managed identity */
if (strcasecmp(ctx->managed_identity_client_id, "system") == 0) {
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1);

if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", "");

} else {
/* user assigned managed identity */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 +
sizeof("&client_id=") - 1 +
flb_sds_len(ctx->managed_identity_client_id));

if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->managed_identity_client_id);
}
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id);
else {
/* config: 'tenant_id' */
if (ctx->tenant_id == NULL) {
flb_plg_error(ctx->ins, "property 'tenant_id' is not defined.");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* config: 'client_id' */
if (ctx->client_id == NULL) {
flb_plg_error(ctx->ins, "property 'client_id' is not defined");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* config: 'client_secret' */
if (ctx->client_secret == NULL) {
flb_plg_error(ctx->ins, "property 'client_secret' is not defined");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* Create the auth URL */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 +
flb_sds_len(ctx->tenant_id));
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id);
}

ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources));
if (!ctx->resources) {
flb_errno();
Expand Down
104 changes: 104 additions & 0 deletions plugins/out_azure_kusto/azure_msiauth.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_oauth2.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_http_client.h>

#include "azure_msiauth.h"

char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx)
{
int ret;
size_t b_sent;
time_t now;
struct flb_connection *u_conn;
struct flb_http_client *c;

now = time(NULL);
if (ctx->access_token) {
/* validate unexpired token */
if (ctx->expires > now && flb_sds_len(ctx->access_token) > 0) {
return ctx->access_token;
}
}

/* Get Token and store it in the context */
u_conn = flb_upstream_conn_get(ctx->u);
if (!u_conn) {
flb_error("[azure msi auth] could not get an upstream connection to %s:%i",
ctx->u->tcp_host, ctx->u->tcp_port);
return NULL;
}

/* Create HTTP client context */
c = flb_http_client(u_conn, FLB_HTTP_GET, ctx->uri,
NULL, 0,
ctx->host, atoi(ctx->port),
NULL, 0);
if (!c) {
flb_error("[azure msi auth] error creating HTTP client context");
flb_upstream_conn_release(u_conn);
return NULL;
}

/* Append HTTP Header */
flb_http_add_header(c, "Metadata", 8, "true", 4);

/* Issue request */
ret = flb_http_do(c, &b_sent);
if (ret != 0) {
flb_warn("[azure msi auth] cannot issue request, http_do=%i", ret);
}
else {
flb_info("[azure msi auth] HTTP Status=%i", c->resp.status);
if (c->resp.payload_size > 0) {
if (c->resp.status == 200) {
flb_debug("[azure msi auth] payload:\n%s", c->resp.payload);
}
else {
flb_info("[azure msi auth] payload:\n%s", c->resp.payload);
}
}
}

/* Extract token */
if (c->resp.payload_size > 0 && c->resp.status == 200) {
ret = flb_oauth2_parse_json_response(c->resp.payload,
c->resp.payload_size, ctx);
if (ret == 0) {
flb_info("[azure msi auth] access token from '%s:%s' retrieved",
ctx->host, ctx->port);
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
ctx->issued = time(NULL);
ctx->expires = ctx->issued + ctx->expires_in;
return ctx->access_token;
}
}

flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);

return NULL;
}
27 changes: 27 additions & 0 deletions plugins/out_azure_kusto/azure_msiauth.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fluent-bit/flb_info.h>

/* MSAL authorization URL */
#define FLB_AZURE_MSIAUTH_URL_TEMPLATE \
"http://169.254.169.254/metadata/identity/oauth2/token?api-version=2021-02-01%s%s&resource=https://api.kusto.windows.net"

char *flb_azure_msiauth_token_get(struct flb_oauth2 *ctx);

13 changes: 10 additions & 3 deletions src/flb_oauth2.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ struct flb_oauth2 *flb_oauth2_create(struct flb_config *config,
goto error;
}

if (!prot || strcmp(prot, "https") != 0) {
if (!prot || (strcmp(prot, "https") != 0 && strcmp(prot, "http") != 0)) {
flb_error("[oauth2] invalid endpoint protocol: %s", auth_url);
goto error;
}
Expand Down Expand Up @@ -227,8 +227,15 @@ struct flb_oauth2 *flb_oauth2_create(struct flb_config *config,
}

/* Create Upstream context */
ctx->u = flb_upstream_create_url(config, auth_url,
FLB_IO_TLS, ctx->tls);
if (strcmp(prot, "https") == 0) {
ctx->u = flb_upstream_create_url(config, auth_url,
FLB_IO_TLS, ctx->tls);
}
else if (strcmp(prot, "http") == 0) {
ctx->u = flb_upstream_create_url(config, auth_url,
FLB_IO_TCP, NULL);
}

if (!ctx->u) {
flb_error("[oauth2] error creating upstream context");
goto error;
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ if(FLB_IN_LIB)
FLB_RT_TEST(FLB_OUT_LIB "core_log.c")
FLB_RT_TEST(FLB_OUT_LIB "config_map_opts.c")
FLB_RT_TEST(FLB_OUT_COUNTER "out_counter.c")
FLB_RT_TEST(FLB_OUT_AZURE_KUSTO "out_azure_kusto.c")
FLB_RT_TEST(FLB_OUT_DATADOG "out_datadog.c")
FLB_RT_TEST(FLB_OUT_SKYWALKING "out_skywalking.c")
FLB_RT_TEST(FLB_OUT_ES "out_elasticsearch.c")
Expand Down
Loading
Loading