diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml
index b8a6eb801bea9..62c176e964af9 100644
--- a/.semaphore/semaphore.yml
+++ b/.semaphore/semaphore.yml
@@ -17,7 +17,7 @@ version: v1.0
name: build-test-release
agent:
machine:
- type: s1-prod-ubuntu20-04-arm64-1
+ type: s1-prod-ubuntu24-04-arm64-2
fail_fast:
cancel:
when: "true"
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerJaasOptions.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerJaasOptions.java
new file mode 100644
index 0000000000000..33009db1d9f49
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerJaasOptions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer;
+
+import org.apache.kafka.common.config.SaslConfigs;
+
+/**
+ * {@code OAuthBearerJaasOptions} holds the different options that can be configured by the user in the
+ * {@link SaslConfigs#SASL_JAAS_CONFIG} configuration.
+ */
+public class OAuthBearerJaasOptions {
+
+ /**
+ * This provides a "namespace" for the jwt-bearer grant type-related JAAS options. The existing implementation
+ * only supported the client_credentials grant type, so those do not have a prefix.
+ */
+ private static final String JWT_BEARER_PREFIX = "jwt-bearer.";
+
+ /**
+ * The {@code grantType} JAAS option is how the user specifies which OAuth grant type to use.
+ */
+ public static final String GRANT_TYPE = "grantType";
+
+ /**
+ * The scope value can be used by multiple grant types, so it has no prefix.
+ */
+ public static final String SCOPE = "scope";
+
+ public static final String CLIENT_CREDENTIALS_CLIENT_ID = "clientId";
+
+ public static final String CLIENT_CREDENTIALS_CLIENT_SECRET = "clientSecret";
+
+ public static final String JWT_BEARER_PRIVATE_KEY_ID = JWT_BEARER_PREFIX + "privateKeyId";
+
+ public static final String JWT_BEARER_PRIVATE_KEY_FILE_NAME = JWT_BEARER_PREFIX + "privateKeyFileName";
+
+ public static final String JWT_BEARER_PRIVATE_KEY_ALGORITHM = JWT_BEARER_PREFIX + "privateKeyAlgorithm";
+
+ /**
+ * The jwt-bearer grant type requires a JWT be created on the client side, signed, and sent in the token
+ * request. The JWT is built up from claims statically set in the JAAS options:
+ *
+ *
+ * - jwt-bearer.claim.sub=some-service-account
+ * - jwt-bearer.claim.aud=my_audience
+ * - jwt-bearer.claim.iss=https://example.com
+ *
+ *
+ * Those claims are used to create a JWT that looks like this:
+ *
+ *
+ * {
+ * "iat": 1741121401,
+ * "exp": 1741125001,
+ * "sub": "some-service-account",
+ * "aud": "my_audience",
+ * "iss": "https://example.com",
+ * "...": "...",
+ * }
+ *
+ *
+ * Note: the JAAS format for specifying claims will likely change in the future to facilitate support for
+ * arbitrarily complicated JWT JSON that includes integers, longs, booleans, lists, and maps.
+ */
+ public static final String JWT_BEARER_CLAIM_PREFIX = JWT_BEARER_PREFIX + "claim.";
+
+ private OAuthBearerJaasOptions() {
+ // Intentionally empty to prevent instantiation.
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandler.java
index fc9e689611520..e38ce08601d2e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandler.java
@@ -30,6 +30,7 @@
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidatorFactory;
import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils;
import org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,8 +45,6 @@
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.sasl.SaslException;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
-
/**
*
* OAuthBearerLoginCallbackHandler
is an {@link AuthenticateCallbackHandler} that
@@ -153,30 +152,10 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand
private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
- public static final String CLIENT_ID_CONFIG = "clientId";
- public static final String CLIENT_SECRET_CONFIG = "clientSecret";
- public static final String SCOPE_CONFIG = "scope";
-
- public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " +
- "client ID to uniquely identify the service account to use for authentication for " +
- "this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " +
- "value and is provided to the OAuth provider using the OAuth " +
- "clientcredentials grant type.";
-
- public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " +
- "client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " +
- "account and identifies the service account to use for authentication for " +
- "this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " +
- "value and is provided to the OAuth provider using the OAuth " +
- "clientcredentials grant type.";
-
- public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " +
- "token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " +
- "OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " +
- "include with the login request.";
-
private static final String EXTENSION_PREFIX = "extension_";
+ private final Time time;
+
private Map moduleOptions;
private AccessTokenRetriever accessTokenRetriever;
@@ -185,10 +164,18 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand
private boolean isInitialized = false;
+ public OAuthBearerLoginCallbackHandler() {
+ this(Time.SYSTEM);
+ }
+
+ public OAuthBearerLoginCallbackHandler(Time time) {
+ this.time = time;
+ }
+
@Override
public void configure(Map configs, String saslMechanism, List jaasConfigEntries) {
moduleOptions = JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries);
- AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(configs, saslMechanism, moduleOptions);
+ AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(time, configs, saslMechanism, moduleOptions);
AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism);
init(accessTokenRetriever, accessTokenValidator);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactory.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactory.java
index 0ed4a1a230349..2f28c791d6e09 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactory.java
@@ -17,11 +17,19 @@
package org.apache.kafka.common.security.oauthbearer.internals.secured;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import java.io.IOException;
import java.net.URL;
+import java.nio.file.Path;
+import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
import javax.net.ssl.SSLSocketFactory;
@@ -32,9 +40,14 @@
import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS;
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.CLIENT_CREDENTIALS_CLIENT_ID;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.CLIENT_CREDENTIALS_CLIENT_SECRET;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.GRANT_TYPE;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.JWT_BEARER_CLAIM_PREFIX;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.JWT_BEARER_PRIVATE_KEY_ALGORITHM;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.JWT_BEARER_PRIVATE_KEY_FILE_NAME;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.JWT_BEARER_PRIVATE_KEY_ID;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.SCOPE;
public class AccessTokenRetrieverFactory {
@@ -44,17 +57,19 @@ public class AccessTokenRetrieverFactory {
* Note: the returned AccessTokenRetriever
is not initialized
* here and must be done by the caller prior to use.
*
+ * @param time Time
* @param configs SASL configuration
* @param jaasConfig JAAS configuration
*
* @return Non-null
{@link AccessTokenRetriever}
*/
- public static AccessTokenRetriever create(Map configs, Map jaasConfig) {
- return create(configs, null, jaasConfig);
+ public static AccessTokenRetriever create(Time time, Map configs, Map jaasConfig) {
+ return create(time, configs, null, jaasConfig);
}
- public static AccessTokenRetriever create(Map configs,
+ public static AccessTokenRetriever create(Time time,
+ Map configs,
String saslMechanism,
Map jaasConfig) {
ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
@@ -64,27 +79,67 @@ public static AccessTokenRetriever create(Map configs,
return new FileTokenRetriever(cu.validateFile(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL));
} else {
JaasOptionsUtils jou = new JaasOptionsUtils(jaasConfig);
- String clientId = jou.validateString(CLIENT_ID_CONFIG);
- String clientSecret = jou.validateString(CLIENT_SECRET_CONFIG);
- String scope = jou.validateString(SCOPE_CONFIG, false);
-
SSLSocketFactory sslSocketFactory = null;
if (jou.shouldCreateSSLSocketFactory(tokenEndpointUrl))
sslSocketFactory = jou.createSSLSocketFactory();
- boolean urlencodeHeader = validateUrlencodeHeader(cu);
-
- return new HttpAccessTokenRetriever(clientId,
- clientSecret,
- scope,
- sslSocketFactory,
- tokenEndpointUrl.toString(),
- cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS),
- cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS),
- cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false),
- cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false),
- urlencodeHeader);
+ String grantType = Optional
+ .ofNullable(jou.validateString(GRANT_TYPE, false))
+ .orElse(ClientCredentialsAccessTokenRetriever.GRANT_TYPE);
+
+ if (grantType.equalsIgnoreCase(ClientCredentialsAccessTokenRetriever.GRANT_TYPE)) {
+ String clientId = jou.validateString(CLIENT_CREDENTIALS_CLIENT_ID);
+ String clientSecret = jou.validateString(CLIENT_CREDENTIALS_CLIENT_SECRET);
+ String scope = jou.validateString(SCOPE, false);
+ boolean urlencodeHeader = validateUrlencodeHeader(cu);
+
+ return new ClientCredentialsAccessTokenRetriever(clientId,
+ clientSecret,
+ scope,
+ sslSocketFactory,
+ tokenEndpointUrl.toString(),
+ cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS),
+ cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS),
+ cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false),
+ cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false),
+ urlencodeHeader);
+ } else if (grantType.equalsIgnoreCase(JwtBearerAccessTokenRetriever.GRANT_TYPE)) {
+ String privateKeyId = jou.validateString(JWT_BEARER_PRIVATE_KEY_ID);
+ Path privateKeyFileName = jou.validateFile(JWT_BEARER_PRIVATE_KEY_FILE_NAME);
+ String privateKeySigningAlgorithm = jou.validateString(JWT_BEARER_PRIVATE_KEY_ALGORITHM);
+ Map staticClaims = getStaticClaims(jaasConfig);
+
+ Supplier privateKeySupplier = () -> {
+ String fileName = privateKeyFileName.toFile().getAbsolutePath();
+
+ try {
+ return Utils.readFileAsString(fileName);
+ } catch (IOException e) {
+ throw new KafkaException("Could not read the private key from the file " + fileName, e);
+ }
+ };
+
+ AssertionCreator assertionCreator = new DefaultAssertionCreator(
+ time,
+ privateKeySupplier,
+ privateKeyId,
+ privateKeySigningAlgorithm
+ );
+
+ return new JwtBearerAccessTokenRetriever(
+ assertionCreator,
+ staticClaims,
+ sslSocketFactory,
+ tokenEndpointUrl.toString(),
+ cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS),
+ cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS),
+ cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false),
+ cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false)
+ );
+ } else {
+ throw new KafkaException("Unsupported grant type provided: " + grantType);
+ }
}
}
@@ -107,4 +162,20 @@ static boolean validateUrlencodeHeader(ConfigurationUtils configurationUtils) {
return DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
}
+ /**
+ * Support for static claims allows the client to pass arbitrary claims to the identity provider.
+ */
+ static Map getStaticClaims(Map jaasConfig) {
+ Map claims = new HashMap<>();
+
+ jaasConfig.forEach((k, v) -> {
+ if (k.startsWith(JWT_BEARER_CLAIM_PREFIX)) {
+ String claimName = k.substring(JWT_BEARER_CLAIM_PREFIX.length());
+ String claimValue = String.valueOf(v);
+ claims.put(claimName, claimValue);
+ }
+ });
+
+ return claims;
+ }
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AssertionCreator.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AssertionCreator.java
new file mode 100644
index 0000000000000..5b9a34f0583c0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AssertionCreator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.config.SaslConfigs;
+
+import java.util.Map;
+
+/**
+ * {@code AssertionCreator} is used to create a client-signed OAuth assertion that can be used with different
+ * grant types. See RFC 7521 for specifics.
+ *
+ *
+ *
+ * The assertion creator has three main steps:
+ *
+ *
+ * - Create the JWT header
+ * - Create the JWT payload
+ * - Sign
+ *
+ *
+ *
+ *
+ * Step 1 is to dynamically create the JWT header:
+ *
+ *
+ * {
+ * "kid": "9d82418e64e0541066637ca8592d459c",
+ * "alg": RS256,
+ * "typ": "JWT",
+ * }
+ *
+ *
+ *
+ *
+ * Step 2 is to create the JWT payload from the claims provided to {@link #create(Map)}. The {@code iat}
+ * and {@code exp} claims are dynamically generated and added to the JWT. Here's an example:
+ *
+ *
+ * {
+ * "iat": 1741121401,
+ * "exp": 1741125001,
+ * "sub": "some-service-account",
+ * "aud": "my_audience",
+ * "iss": "https://example.com",
+ * "...": "...",
+ * }
+ *
+ *
+ *
+ *
+ * Step 3 is to use the configured private key to sign the header and payload and serialize in the compact
+ * JWT format.
+ */
+public interface AssertionCreator {
+
+ /**
+ * Creates and signs an OAuth assertion by converting the given claims into JWT and then signing it using
+ * the configured algorithm.
+ *
+ * The claims here are statically defined in the {@link SaslConfigs#SASL_JAAS_CONFIG}.
+ */
+ String create(Map claims);
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ClientCredentialsAccessTokenRetriever.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ClientCredentialsAccessTokenRetriever.java
new file mode 100644
index 0000000000000..87ea1b4bf24b4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ClientCredentialsAccessTokenRetriever.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions;
+import org.apache.kafka.common.utils.Utils;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.net.ssl.SSLSocketFactory;
+
+/**
+ * ClientCredentialsAccessTokenRetriever
is an {@link HttpAccessTokenRetriever} that will
+ * post client credentials
+ * ({@link OAuthBearerJaasOptions#CLIENT_CREDENTIALS_CLIENT_ID}/{@link OAuthBearerJaasOptions#CLIENT_CREDENTIALS_CLIENT_SECRET})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}).
+ *
+ * @see HttpAccessTokenRetriever
+ * @see OAuthBearerJaasOptions#CLIENT_CREDENTIALS_CLIENT_ID
+ * @see OAuthBearerJaasOptions#CLIENT_CREDENTIALS_CLIENT_SECRET
+ * @see OAuthBearerJaasOptions#SCOPE
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL
+ */
+
+public class ClientCredentialsAccessTokenRetriever extends HttpAccessTokenRetriever {
+
+ public static final String GRANT_TYPE = "client_credentials";
+
+ private final String clientId;
+
+ private final String clientSecret;
+
+ private final String scope;
+
+ private final boolean urlencodeHeader;
+
+ public ClientCredentialsAccessTokenRetriever(String clientId,
+ String clientSecret,
+ String scope,
+ SSLSocketFactory sslSocketFactory,
+ String tokenEndpointUrl,
+ long loginRetryBackoffMs,
+ long loginRetryBackoffMaxMs,
+ Integer loginConnectTimeoutMs,
+ Integer loginReadTimeoutMs,
+ boolean urlencodeHeader) {
+ super(
+ sslSocketFactory,
+ tokenEndpointUrl,
+ loginRetryBackoffMs,
+ loginRetryBackoffMaxMs,
+ loginConnectTimeoutMs,
+ loginReadTimeoutMs
+ );
+ this.clientId = Objects.requireNonNull(clientId);
+ this.clientSecret = Objects.requireNonNull(clientSecret);
+ this.scope = scope;
+ this.urlencodeHeader = urlencodeHeader;
+ }
+
+ @Override
+ protected String formatRequestBody() {
+ return formatRequestBody(scope);
+ }
+
+ @Override
+ protected Map formatRequestHeaders() {
+ return Collections.singletonMap(AUTHORIZATION_HEADER, formatAuthorizationHeader(clientId, clientSecret, urlencodeHeader));
+ }
+
+ static String formatAuthorizationHeader(String clientId, String clientSecret, boolean urlencode) {
+ clientId = sanitizeString("the token endpoint request client ID parameter", clientId);
+ clientSecret = sanitizeString("the token endpoint request client secret parameter", clientSecret);
+
+ // according to RFC-6749 clientId & clientSecret must be urlencoded, see https://tools.ietf.org/html/rfc6749#section-2.3.1
+ if (urlencode) {
+ clientId = URLEncoder.encode(clientId, StandardCharsets.UTF_8);
+ clientSecret = URLEncoder.encode(clientSecret, StandardCharsets.UTF_8);
+ }
+
+ String s = String.format("%s:%s", clientId, clientSecret);
+ // Per RFC-7617, we need to use the *non-URL safe* base64 encoder. See KAFKA-14496.
+ String encoded = Base64.getEncoder().encodeToString(Utils.utf8(s));
+ return String.format("Basic %s", encoded);
+ }
+
+ static String formatRequestBody(String scope) {
+ StringBuilder requestParameters = new StringBuilder();
+ requestParameters.append("grant_type=" + GRANT_TYPE);
+
+ if (!Utils.isBlank(scope)) {
+ String encodedScope = URLEncoder.encode(scope.trim(), StandardCharsets.UTF_8);
+ requestParameters.append("&scope=").append(encodedScope);
+ }
+
+ return requestParameters.toString();
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/DefaultAssertionCreator.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/DefaultAssertionCreator.java
new file mode 100644
index 0000000000000..ff8d0c4c8ab4e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/DefaultAssertionCreator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Time;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.jose4j.jws.JsonWebSignature;
+
+import java.nio.charset.StandardCharsets;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * An {@link AssertionCreator} that creates the assertion from a private key in the configuration.
+ */
+public class DefaultAssertionCreator implements AssertionCreator {
+
+ private final Time time;
+ private final Supplier privateKeySupplier;
+ private final String privateKeyId;
+ private final String privateKeySigningAlgorithm;
+
+ public DefaultAssertionCreator(Time time,
+ Supplier privateKeySupplier,
+ String privateKeyId,
+ String privateKeySigningAlgorithm) {
+ this.time = time;
+ this.privateKeySupplier = privateKeySupplier;
+ this.privateKeyId = privateKeyId;
+ this.privateKeySigningAlgorithm = privateKeySigningAlgorithm;
+ }
+
+ @Override
+ public String create(Map claims) {
+ try {
+ String privateKeyContents = privateKeySupplier.get()
+ .replace("-----BEGIN PRIVATE KEY-----", "")
+ .replace("-----END PRIVATE KEY-----", "")
+ .replaceAll("\\s", "");
+
+ byte[] pkcs8EncodedBytes = Base64.getDecoder().decode(privateKeyContents.getBytes(StandardCharsets.UTF_8));
+ PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkcs8EncodedBytes);
+ KeyFactory keyFactory = KeyFactory.getInstance("RSA");
+ PrivateKey privateKey = keyFactory.generatePrivate(keySpec);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String payload = mapper.writeValueAsString(augmentedClaims(claims));
+
+ JsonWebSignature jws = new JsonWebSignature();
+ jws.setKey(privateKey);
+ jws.setKeyIdHeaderValue(privateKeyId);
+ jws.setAlgorithmHeaderValue(privateKeySigningAlgorithm);
+ jws.setPayload(payload);
+ return jws.getCompactSerialization();
+ } catch (Exception e) {
+ throw new KafkaException("An error was thrown when creating the OAuth assertion", e);
+ }
+ }
+
+ Map augmentedClaims(Map claims) {
+ long currentTimeSecs = time.milliseconds() / 1000;
+ long expirationSecs = currentTimeSecs + Duration.ofMinutes(60).toSeconds();
+
+ Map augmentedClaims = new HashMap<>(claims);
+
+ if (!augmentedClaims.containsKey("iat"))
+ augmentedClaims.put("iat", currentTimeSecs);
+
+ if (!augmentedClaims.containsKey("exp"))
+ augmentedClaims.put("exp", expirationSecs);
+
+ return augmentedClaims;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
index fdc5707278a60..7ee2450bce8dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java
@@ -19,7 +19,6 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler;
import org.apache.kafka.common.utils.Utils;
import com.fasterxml.jackson.databind.JsonNode;
@@ -35,11 +34,10 @@
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.net.URLEncoder;
+import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -50,19 +48,14 @@
/**
* HttpAccessTokenRetriever
is an {@link AccessTokenRetriever} that will
- * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials
- * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
- * to a publicized token endpoint URL
+ * communicate with an OAuth/OIDC provider directly via HTTP to a publicized token endpoint URL
* ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}).
*
* @see AccessTokenRetriever
- * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
- * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
- * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
* @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL
*/
-public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+public abstract class HttpAccessTokenRetriever implements AccessTokenRetriever {
private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
@@ -97,12 +90,6 @@ public class HttpAccessTokenRetriever implements AccessTokenRetriever {
UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
}
- private final String clientId;
-
- private final String clientSecret;
-
- private final String scope;
-
private final SSLSocketFactory sslSocketFactory;
private final String tokenEndpointUrl;
@@ -115,30 +102,36 @@ public class HttpAccessTokenRetriever implements AccessTokenRetriever {
private final Integer loginReadTimeoutMs;
- private final boolean urlencodeHeader;
-
- public HttpAccessTokenRetriever(String clientId,
- String clientSecret,
- String scope,
- SSLSocketFactory sslSocketFactory,
- String tokenEndpointUrl,
- long loginRetryBackoffMs,
- long loginRetryBackoffMaxMs,
- Integer loginConnectTimeoutMs,
- Integer loginReadTimeoutMs,
- boolean urlencodeHeader) {
- this.clientId = Objects.requireNonNull(clientId);
- this.clientSecret = Objects.requireNonNull(clientSecret);
- this.scope = scope;
+ protected HttpAccessTokenRetriever(SSLSocketFactory sslSocketFactory,
+ String tokenEndpointUrl,
+ long loginRetryBackoffMs,
+ long loginRetryBackoffMaxMs,
+ Integer loginConnectTimeoutMs,
+ Integer loginReadTimeoutMs) {
this.sslSocketFactory = sslSocketFactory;
this.tokenEndpointUrl = Objects.requireNonNull(tokenEndpointUrl);
this.loginRetryBackoffMs = loginRetryBackoffMs;
this.loginRetryBackoffMaxMs = loginRetryBackoffMaxMs;
this.loginConnectTimeoutMs = loginConnectTimeoutMs;
this.loginReadTimeoutMs = loginReadTimeoutMs;
- this.urlencodeHeader = urlencodeHeader;
}
+ /**
+ * Format the data that will be sent in the payload of the POST request. The returned
+ * String will be converted to UTF-8.
+ *
+ * @return Data to send in body of request
+ */
+ protected abstract String formatRequestBody();
+
+ /**
+ * Format and return request headers that will be sent along in the outgoing HTTP
+ * request via {@link URLConnection#setRequestProperty(String, String)}.
+ *
+ * @return Map of headers to include, or {@code null} if no extra headers are needed
+ */
+ protected abstract Map formatRequestHeaders();
+
/**
* Retrieves a JWT access token in its serialized three-part form. The implementation
* is free to determine how it should be retrieved but should not perform validation
@@ -156,10 +149,9 @@ public HttpAccessTokenRetriever(String clientId,
@Override
public String retrieve() throws IOException {
- String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret, urlencodeHeader);
- String requestBody = formatRequestBody(scope);
+ String requestBody = formatRequestBody();
Retry retry = new Retry<>(loginRetryBackoffMs, loginRetryBackoffMaxMs);
- Map headers = Collections.singletonMap(AUTHORIZATION_HEADER, authorizationHeader);
+ Map headers = formatRequestHeaders();
String responseBody;
@@ -334,55 +326,30 @@ static String formatErrorMessage(String errorResponseBody) {
static String parseAccessToken(String responseBody) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(responseBody);
- JsonNode accessTokenNode = rootNode.at("/access_token");
- if (accessTokenNode == null) {
- // Only grab the first N characters so that if the response body is huge, we don't
- // blow up.
- String snippet = responseBody;
+ for (String jsonExpression : List.of("/id_token", "/access_token")) {
+ JsonNode tokenNode = rootNode.at(jsonExpression);
- if (snippet.length() > MAX_RESPONSE_BODY_LENGTH) {
- int actualLength = responseBody.length();
- String s = responseBody.substring(0, MAX_RESPONSE_BODY_LENGTH);
- snippet = String.format("%s (trimmed to first %d characters out of %d total)", s, MAX_RESPONSE_BODY_LENGTH, actualLength);
- }
+ if (tokenNode == null || Utils.isBlank(tokenNode.textValue()))
+ continue;
- throw new IOException(String.format("The token endpoint response did not contain an access_token value. Response: (%s)", snippet));
+ return tokenNode.textValue().trim();
}
- return sanitizeString("the token endpoint response's access_token JSON attribute", accessTokenNode.textValue());
- }
-
- static String formatAuthorizationHeader(String clientId, String clientSecret, boolean urlencode) {
- clientId = sanitizeString("the token endpoint request client ID parameter", clientId);
- clientSecret = sanitizeString("the token endpoint request client secret parameter", clientSecret);
-
- // according to RFC-6749 clientId & clientSecret must be urlencoded, see https://tools.ietf.org/html/rfc6749#section-2.3.1
- if (urlencode) {
- clientId = URLEncoder.encode(clientId, StandardCharsets.UTF_8);
- clientSecret = URLEncoder.encode(clientSecret, StandardCharsets.UTF_8);
- }
-
- String s = String.format("%s:%s", clientId, clientSecret);
- // Per RFC-7617, we need to use the *non-URL safe* base64 encoder. See KAFKA-14496.
- String encoded = Base64.getEncoder().encodeToString(Utils.utf8(s));
- return String.format("Basic %s", encoded);
- }
-
- static String formatRequestBody(String scope) {
- StringBuilder requestParameters = new StringBuilder();
- requestParameters.append("grant_type=client_credentials");
+ // Only grab the first N characters so that if the response body is huge, we don't
+ // blow up.
+ String snippet = responseBody;
- if (scope != null && !scope.trim().isEmpty()) {
- scope = scope.trim();
- String encodedScope = URLEncoder.encode(scope, StandardCharsets.UTF_8);
- requestParameters.append("&scope=").append(encodedScope);
+ if (snippet.length() > MAX_RESPONSE_BODY_LENGTH) {
+ int actualLength = responseBody.length();
+ String s = responseBody.substring(0, MAX_RESPONSE_BODY_LENGTH);
+ snippet = String.format("%s (trimmed to first %d characters out of %d total)", s, MAX_RESPONSE_BODY_LENGTH, actualLength);
}
- return requestParameters.toString();
+ throw new IllegalArgumentException(String.format("The token endpoint response did not contain an acceptable token value. Response: (%s)", snippet));
}
- private static String sanitizeString(String name, String value) {
+ static String sanitizeString(String name, String value) {
if (value == null)
throw new IllegalArgumentException(String.format("The value for %s must be non-null", name));
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
index aa7725b45d182..71f9c7ddcf162 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
@@ -28,7 +28,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.net.URL;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -121,4 +123,33 @@ public String validateString(String name, boolean isRequired) throws ValidateExc
return value;
}
+ /**
+ * Validates that, if a value is supplied, is a file that:
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * If the value is null or an empty string, it is assumed to be an "empty" value and thus.
+ * ignored. Any whitespace is trimmed off of the beginning and end.
+ */
+
+ public Path validateFile(String name) {
+ String fileName = validateString(name);
+ File file = new File(fileName).getAbsoluteFile();
+
+ if (!file.exists())
+ throw new ConfigException(String.format("The OAuth configuration option %s contains a file (%s) that doesn't exist", name, file));
+
+ if (!file.canRead())
+ throw new ConfigException(String.format("The OAuth configuration option %s contains a file (%s) that doesn't have read permission", name, file));
+
+ if (file.isDirectory())
+ throw new ConfigException(String.format("The OAuth configuration option %s references a directory (%s), not a file", name, file));
+
+ return file.toPath();
+ }
+
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JwtBearerAccessTokenRetriever.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JwtBearerAccessTokenRetriever.java
new file mode 100644
index 0000000000000..f2778dbcf7bb0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JwtBearerAccessTokenRetriever.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+
+import javax.net.ssl.SSLSocketFactory;
+
+/**
+ * JwtBearerAccessTokenRetriever
is an {@link HttpAccessTokenRetriever} that will
+ * post an assertion using the {@code urn:ietf:params:oauth:grant-type:jwt-bearer} grant type to
+ * a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL}).
+ *
+ *
+ *
+ * Here's an example of the sasl.jaas.config
:
+ *
+ *
+ * sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
+ * grantType="urn:ietf:params:oauth:grant-type:jwt-bearer" \
+ * jwt-bearer.privateKeyId="54e64f2cf26070fe1fd67e59bf34679d" \
+ * jwt-bearer.privateKeyFileName="/etc/rsa.key" \
+ * jwt-bearer.privateKeyAlgorithm="RS256" \
+ * jwt-bearer.claim.iss="foo" \
+ * jwt-bearer.claim.aud="bar" ;
+ *
+ *
+ * @see HttpAccessTokenRetriever
+ * @see OAuthBearerJaasOptions#JWT_BEARER_PRIVATE_KEY_ID
+ * @see OAuthBearerJaasOptions#JWT_BEARER_PRIVATE_KEY_FILE_NAME
+ * @see OAuthBearerJaasOptions#JWT_BEARER_PRIVATE_KEY_ALGORITHM
+ * @see OAuthBearerJaasOptions#JWT_BEARER_CLAIM_PREFIX
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL
+ */
+public class JwtBearerAccessTokenRetriever extends HttpAccessTokenRetriever {
+
+ public static final String GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer";
+
+ private final AssertionCreator assertionCreator;
+ private final Map staticClaims;
+
+ public JwtBearerAccessTokenRetriever(AssertionCreator assertionCreator,
+ Map staticClaims,
+ SSLSocketFactory sslSocketFactory,
+ String tokenEndpointUrl,
+ long loginRetryBackoffMs,
+ long loginRetryBackoffMaxMs,
+ Integer loginConnectTimeoutMs,
+ Integer loginReadTimeoutMs) {
+ super(
+ sslSocketFactory,
+ tokenEndpointUrl,
+ loginRetryBackoffMs,
+ loginRetryBackoffMaxMs,
+ loginConnectTimeoutMs,
+ loginReadTimeoutMs
+ );
+
+ this.assertionCreator = assertionCreator;
+ this.staticClaims = staticClaims;
+ }
+
+ @Override
+ protected String formatRequestBody() {
+ String assertion = assertionCreator.create(staticClaims);
+ String encodedGrantType = URLEncoder.encode(GRANT_TYPE, StandardCharsets.UTF_8);
+ String encodedAssertion = URLEncoder.encode(assertion, StandardCharsets.UTF_8);
+ return String.format("grant_type=%s&assertion=%s", encodedGrantType, encodedAssertion);
+ }
+
+ @Override
+ protected Map formatRequestHeaders() {
+ return Collections.singletonMap("Content-Type", "application/x-www-form-urlencoded");
+ }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandlerTest.java
index 5b1b2976662b6..9d3b6837eee00 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginCallbackHandlerTest.java
@@ -47,8 +47,8 @@
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.CLIENT_CREDENTIALS_CLIENT_ID;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerJaasOptions.CLIENT_CREDENTIALS_CLIENT_SECRET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -95,8 +95,8 @@ public void testHandleSaslExtensionsCallback() throws Exception {
Map configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com");
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, "http://www.example.com");
Map jaasConfig = new HashMap<>();
- jaasConfig.put(CLIENT_ID_CONFIG, "an ID");
- jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret");
+ jaasConfig.put(CLIENT_CREDENTIALS_CLIENT_ID, "an ID");
+ jaasConfig.put(CLIENT_CREDENTIALS_CLIENT_SECRET, "a secret");
jaasConfig.put("extension_foo", "1");
jaasConfig.put("extension_bar", 2);
jaasConfig.put("EXTENSION_baz", "3");
@@ -125,8 +125,8 @@ public void testHandleSaslExtensionsCallbackWithInvalidExtension() {
Map configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com");
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, "http://www.example.com");
Map jaasConfig = new HashMap<>();
- jaasConfig.put(CLIENT_ID_CONFIG, "an ID");
- jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret");
+ jaasConfig.put(CLIENT_CREDENTIALS_CLIENT_ID, "an ID");
+ jaasConfig.put(CLIENT_CREDENTIALS_CLIENT_SECRET, "a secret");
jaasConfig.put(illegalKey, "this key isn't allowed per OAuthBearerClientInitialResponse.validateExtensions");
configureHandler(handler, configs, jaasConfig);
@@ -235,8 +235,8 @@ public void testConfigureWithAccessClientCredentials() {
Map configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com");
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, "http://www.example.com");
Map jaasConfigs = new HashMap<>();
- jaasConfigs.put(CLIENT_ID_CONFIG, "an ID");
- jaasConfigs.put(CLIENT_SECRET_CONFIG, "a secret");
+ jaasConfigs.put(CLIENT_CREDENTIALS_CLIENT_ID, "an ID");
+ jaasConfigs.put(CLIENT_CREDENTIALS_CLIENT_SECRET, "a secret");
configureHandler(handler, configs, jaasConfigs);
assertInstanceOf(HttpAccessTokenRetriever.class, handler.getAccessTokenRetriever());
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
index 3e85f7b0ce4fa..80001ae5e8e88 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/AccessTokenRetrieverFactoryTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.common.security.oauthbearer.internals.secured;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -53,8 +55,9 @@ public void testConfigureRefreshingFileAccessTokenRetriever() throws Exception {
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, accessTokenFile.toURI().toString());
Map configs = Collections.singletonMap(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, accessTokenFile.toURI().toString());
Map jaasConfig = Collections.emptyMap();
+ Time time = new MockTime();
- try (AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(configs, jaasConfig)) {
+ try (AccessTokenRetriever accessTokenRetriever = AccessTokenRetrieverFactory.create(time, configs, jaasConfig)) {
accessTokenRetriever.init();
assertEquals(expected, accessTokenRetriever.retrieve());
}
@@ -67,7 +70,8 @@ public void testConfigureRefreshingFileAccessTokenRetrieverWithInvalidDirectory(
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, file);
Map configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, file);
Map jaasConfig = Collections.emptyMap();
- assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
+ Time time = new MockTime();
+ assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(time, configs, jaasConfig), "that doesn't exist");
}
@Test
@@ -78,7 +82,8 @@ public void testConfigureRefreshingFileAccessTokenRetrieverWithInvalidFile() thr
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, accessTokenFile.toURI().toString());
Map configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, accessTokenFile.toURI().toString());
Map jaasConfig = Collections.emptyMap();
- assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
+ Time time = new MockTime();
+ assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(time, configs, jaasConfig), "that doesn't exist");
}
@Test
@@ -87,7 +92,8 @@ public void testSaslOauthbearerTokenEndpointUrlIsNotAllowed() throws Exception {
File tmpDir = createTempDir("not_allowed");
File accessTokenFile = new File(tmpDir, "not_allowed.json");
Map configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, accessTokenFile.toURI().toString());
- assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(configs, Collections.emptyMap()),
+ Time time = new MockTime();
+ assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(time, configs, Collections.emptyMap()),
ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ClientCredentialsAccessTokenRetrieverTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ClientCredentialsAccessTokenRetrieverTest.java
new file mode 100644
index 0000000000000..be0894d0fe5eb
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ClientCredentialsAccessTokenRetrieverTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ClientCredentialsAccessTokenRetrieverTest extends OAuthBearerTest {
+
+ @Test
+ public void testFormatAuthorizationHeader() {
+ assertAuthorizationHeader("id", "secret", false, "Basic aWQ6c2VjcmV0");
+ }
+
+ @Test
+ public void testFormatAuthorizationHeaderEncoding() {
+ // according to RFC-7617, we need to use the *non-URL safe* base64 encoder. See KAFKA-14496.
+ assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", "9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E", false, "Basic U09NRV9SQU5ET01fTE9OR19VU0VSXzAxMjM0OjlRfDBgOGl+dXRlLW45a3NqTFdiXDUwIkFYQFVVRUQ1RQ==");
+ // according to RFC-6749 clientId & clientSecret must be urlencoded, see https://tools.ietf.org/html/rfc6749#section-2.3.1
+ assertAuthorizationHeader("user!@~'", "secret-(*)!", true, "Basic dXNlciUyMSU0MCU3RSUyNzpzZWNyZXQtJTI4KiUyOSUyMQ==");
+ }
+
+ private void assertAuthorizationHeader(String clientId, String clientSecret, boolean urlencode, String expected) {
+ String actual = ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader(clientId, clientSecret, urlencode);
+ assertEquals(expected, actual, String.format("Expected the HTTP Authorization header generated for client ID \"%s\" and client secret \"%s\" to match", clientId, clientSecret));
+ }
+
+ @Test
+ public void testFormatAuthorizationHeaderMissingValues() {
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader(null, "secret", false));
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader("id", null, false));
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader(null, null, false));
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader("", "secret", false));
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader("id", "", false));
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader("", "", false));
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader(" ", "secret", false));
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader("id", " ", false));
+ assertThrows(IllegalArgumentException.class, () -> ClientCredentialsAccessTokenRetriever.formatAuthorizationHeader(" ", " ", false));
+ }
+
+ @Test
+ public void testFormatRequestBody() {
+ String expected = "grant_type=client_credentials&scope=scope";
+ String actual = ClientCredentialsAccessTokenRetriever.formatRequestBody("scope");
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testFormatRequestBodyWithEscaped() {
+ String questionMark = "%3F";
+ String exclamationMark = "%21";
+
+ String expected = String.format("grant_type=client_credentials&scope=earth+is+great%s", exclamationMark);
+ String actual = ClientCredentialsAccessTokenRetriever.formatRequestBody("earth is great!");
+ assertEquals(expected, actual);
+
+ expected = String.format("grant_type=client_credentials&scope=what+on+earth%s%s%s%s%s", questionMark, exclamationMark, questionMark, exclamationMark, questionMark);
+ actual = ClientCredentialsAccessTokenRetriever.formatRequestBody("what on earth?!?!?");
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testFormatRequestBodyMissingValues() {
+ String expected = "grant_type=client_credentials";
+ String actual = ClientCredentialsAccessTokenRetriever.formatRequestBody(null);
+ assertEquals(expected, actual);
+
+ actual = ClientCredentialsAccessTokenRetriever.formatRequestBody("");
+ assertEquals(expected, actual);
+
+ actual = ClientCredentialsAccessTokenRetriever.formatRequestBody(" ");
+ assertEquals(expected, actual);
+ }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/DefaultAssertionCreatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/DefaultAssertionCreatorTest.java
new file mode 100644
index 0000000000000..bb33903809c00
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/DefaultAssertionCreatorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.jose4j.jwt.consumer.JwtConsumer;
+import org.jose4j.jwt.consumer.JwtConsumerBuilder;
+import org.jose4j.jwt.consumer.JwtContext;
+import org.jose4j.jwx.JsonWebStructure;
+import org.jose4j.lang.InvalidAlgorithmException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class DefaultAssertionCreatorTest extends OAuthBearerTest {
+
+ @Test
+ public void testPrivateKeyId() throws Exception {
+ KeyPair keyPair = generateKeyPair();
+ Builder builder = new Builder(keyPair.getPrivate())
+ .setPrivateKeyId("test-id");
+
+ Map claims = generateClaims();
+ AssertionCreator assertionCreator = builder.build();
+ String assertion = assertionCreator.create(claims);
+ JwtConsumer jwtConsumer = jwtConsumer(keyPair.getPublic(), claims);
+ JwtContext context = jwtConsumer.process(assertion);
+ List joseObjects = context.getJoseObjects();
+ assertNotNull(joseObjects);
+ assertEquals(1, joseObjects.size());
+ JsonWebStructure jsonWebStructure = joseObjects.get(0);
+ assertEquals("test-id", jsonWebStructure.getKeyIdHeaderValue());
+ }
+
+ @Test
+ public void testPrivateKeySecret() throws Exception {
+ KeyPair keyPair = generateKeyPair();
+ Builder builder = new Builder(keyPair.getPrivate());
+ AssertionCreator assertionCreator = builder.build();
+ Map claims = generateClaims();
+ String assertion = assertionCreator.create(claims);
+ JwtConsumer jwtConsumer = jwtConsumer(keyPair.getPublic(), claims);
+ jwtConsumer.process(assertion);
+ }
+
+ @Test
+ public void testInvalidPrivateKeySecret() {
+ // Intentionally "mangle" the private key secret by stripping off the first character.
+ Supplier privateKeySupplier = () -> generatePrivateKeySecret().substring(1);
+
+ AssertionCreator assertionCreator = new DefaultAssertionCreator(
+ new MockTime(),
+ privateKeySupplier,
+ "foo",
+ "RS256"
+ );
+
+ assertThrows(KafkaException.class, () -> assertionCreator.create(generateClaims()));
+ }
+
+ @ParameterizedTest
+ @CsvSource("RS256,ES256")
+ public void testTokenSigningAlgo(String tokenSigningAlgo) throws Exception {
+ KeyPair keyPair = generateKeyPair();
+ Builder builder = new Builder(keyPair.getPrivate())
+ .setPrivateKeySigningAlgorithm(tokenSigningAlgo);
+
+ AssertionCreator assertionCreator = builder.build();
+ Map claims = generateClaims();
+ String assertion = assertionCreator.create(claims);
+ JwtConsumer jwtConsumer = jwtConsumer(keyPair.getPublic(), claims);
+ JwtContext context = jwtConsumer.process(assertion);
+ List joseObjects = context.getJoseObjects();
+ assertNotNull(joseObjects);
+ assertEquals(1, joseObjects.size());
+ JsonWebStructure jsonWebStructure = joseObjects.get(0);
+ assertEquals(tokenSigningAlgo, jsonWebStructure.getAlgorithmHeaderValue());
+ }
+
+ @Test
+ public void testInvalidTokenSigningAlgo() {
+ PrivateKey privateKey = generateKeyPair().getPrivate();
+ Builder builder = new Builder(privateKey)
+ .setPrivateKeySigningAlgorithm("thisisnotvalid");
+ AssertionCreator assertionCreator = builder.build();
+ Map claims = generateClaims();
+ Exception e = assertThrows(KafkaException.class, () -> assertionCreator.create(claims));
+ assertNotNull(e);
+ Throwable cause = e.getCause();
+ assertNotNull(cause);
+ assertInstanceOf(InvalidAlgorithmException.class, cause);
+ }
+
+ private JwtConsumer jwtConsumer(PublicKey publicKey, Map expectedValues) {
+ JwtConsumerBuilder builder = new JwtConsumerBuilder()
+ .setVerificationKey(publicKey)
+ .setRequireExpirationTime()
+ .setAllowedClockSkewInSeconds(30);
+
+ if (expectedValues.containsKey("sub"))
+ builder = builder.setExpectedSubject(expectedValues.get("sub").toString());
+
+ if (expectedValues.containsKey("iss"))
+ builder = builder.setExpectedIssuer(expectedValues.get("iss").toString());
+
+ if (expectedValues.containsKey("aud"))
+ builder = builder.setExpectedAudience(expectedValues.get("aud").toString());
+
+ return builder.build();
+ }
+
+ private String generatePrivateKeySecret(PrivateKey privateKey) {
+ return Base64.getEncoder().encodeToString(privateKey.getEncoded());
+ }
+
+ private String generatePrivateKeySecret() {
+ return generatePrivateKeySecret(generateKeyPair().getPrivate());
+ }
+
+ private KeyPair generateKeyPair() {
+ try {
+ KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
+ keyGen.initialize(2048);
+ return keyGen.generateKeyPair();
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException("Received unexpected error during private key generation", e);
+ }
+ }
+
+ private Map generateClaims() {
+ Map claims = new HashMap<>();
+ claims.put("sub", "testTokenSubject");
+ claims.put("iss", "testTokenIssuer");
+ claims.put("aud", "testTokenAudience");
+ return claims;
+ }
+
+ private static class Builder {
+
+ private final Time time = new MockTime();
+ private final PrivateKey privateKey;
+ private String privateKeyId = "testPrivateKeyId";
+ private String privateKeySigningAlgorithm = "RS256";
+
+ public Builder(PrivateKey privateKey) {
+ this.privateKey = privateKey;
+ }
+
+ public Builder setPrivateKeyId(String privateKeyId) {
+ this.privateKeyId = privateKeyId;
+ return this;
+ }
+
+ public Builder setPrivateKeySigningAlgorithm(String privateKeySigningAlgorithm) {
+ this.privateKeySigningAlgorithm = privateKeySigningAlgorithm;
+ return this;
+ }
+
+
+ private AssertionCreator build() {
+ Supplier privateKeySupplier = () -> Base64.getEncoder().encodeToString(privateKey.getEncoded());
+
+ return new DefaultAssertionCreator(
+ time,
+ privateKeySupplier,
+ privateKeyId,
+ privateKeySigningAlgorithm
+ );
+ }
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
index 8b1c5a370652e..08fff12476f1f 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetrieverTest.java
@@ -170,69 +170,4 @@ public void testParseAccessTokenInvalidJson() {
assertThrows(IOException.class, () -> HttpAccessTokenRetriever.parseAccessToken("not valid JSON"));
}
- @Test
- public void testFormatAuthorizationHeader() {
- assertAuthorizationHeader("id", "secret", false, "Basic aWQ6c2VjcmV0");
- }
-
- @Test
- public void testFormatAuthorizationHeaderEncoding() {
- // according to RFC-7617, we need to use the *non-URL safe* base64 encoder. See KAFKA-14496.
- assertAuthorizationHeader("SOME_RANDOM_LONG_USER_01234", "9Q|0`8i~ute-n9ksjLWb\\50\"AX@UUED5E", false, "Basic U09NRV9SQU5ET01fTE9OR19VU0VSXzAxMjM0OjlRfDBgOGl+dXRlLW45a3NqTFdiXDUwIkFYQFVVRUQ1RQ==");
- // according to RFC-6749 clientId & clientSecret must be urlencoded, see https://tools.ietf.org/html/rfc6749#section-2.3.1
- assertAuthorizationHeader("user!@~'", "secret-(*)!", true, "Basic dXNlciUyMSU0MCU3RSUyNzpzZWNyZXQtJTI4KiUyOSUyMQ==");
- }
-
- private void assertAuthorizationHeader(String clientId, String clientSecret, boolean urlencode, String expected) {
- String actual = HttpAccessTokenRetriever.formatAuthorizationHeader(clientId, clientSecret, urlencode);
- assertEquals(expected, actual, String.format("Expected the HTTP Authorization header generated for client ID \"%s\" and client secret \"%s\" to match", clientId, clientSecret));
- }
-
- @Test
- public void testFormatAuthorizationHeaderMissingValues() {
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader(null, "secret", false));
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("id", null, false));
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader(null, null, false));
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("", "secret", false));
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("id", "", false));
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("", "", false));
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader(" ", "secret", false));
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader("id", " ", false));
- assertThrows(IllegalArgumentException.class, () -> HttpAccessTokenRetriever.formatAuthorizationHeader(" ", " ", false));
- }
-
- @Test
- public void testFormatRequestBody() {
- String expected = "grant_type=client_credentials&scope=scope";
- String actual = HttpAccessTokenRetriever.formatRequestBody("scope");
- assertEquals(expected, actual);
- }
-
- @Test
- public void testFormatRequestBodyWithEscaped() {
- String questionMark = "%3F";
- String exclamationMark = "%21";
-
- String expected = String.format("grant_type=client_credentials&scope=earth+is+great%s", exclamationMark);
- String actual = HttpAccessTokenRetriever.formatRequestBody("earth is great!");
- assertEquals(expected, actual);
-
- expected = String.format("grant_type=client_credentials&scope=what+on+earth%s%s%s%s%s", questionMark, exclamationMark, questionMark, exclamationMark, questionMark);
- actual = HttpAccessTokenRetriever.formatRequestBody("what on earth?!?!?");
- assertEquals(expected, actual);
- }
-
- @Test
- public void testFormatRequestBodyMissingValues() {
- String expected = "grant_type=client_credentials";
- String actual = HttpAccessTokenRetriever.formatRequestBody(null);
- assertEquals(expected, actual);
-
- actual = HttpAccessTokenRetriever.formatRequestBody("");
- assertEquals(expected, actual);
-
- actual = HttpAccessTokenRetriever.formatRequestBody(" ");
- assertEquals(expected, actual);
- }
-
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JwtBearerAccessTokenRetrieverTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JwtBearerAccessTokenRetrieverTest.java
new file mode 100644
index 0000000000000..99e8d5f251a48
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JwtBearerAccessTokenRetrieverTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JwtBearerAccessTokenRetrieverTest extends OAuthBearerTest {
+
+ private final Time time = new MockTime();
+
+ @Test
+ public void testFormatRequestBody() throws Exception {
+ AssertionCreator assertionCreator = generateAssertionCreator(generateKeyPair().getPrivate());
+ Map staticClaims = generateClaims();
+
+ try (JwtBearerAccessTokenRetriever requestFormatter = generateRetriever(assertionCreator, staticClaims)) {
+ String assertion = assertionCreator.create(staticClaims);
+ String requestBody = requestFormatter.formatRequestBody();
+ String expected = "grant_type=" + URLEncoder.encode(JwtBearerAccessTokenRetriever.GRANT_TYPE, StandardCharsets.UTF_8) + "&assertion=" + assertion;
+ assertEquals(expected, requestBody);
+ }
+ }
+
+ @Test
+ public void testFormatRequestHeaders() throws Exception {
+ AssertionCreator assertionCreator = generateAssertionCreator(generateKeyPair().getPrivate());
+ Map staticClaims = generateClaims();
+
+ try (JwtBearerAccessTokenRetriever requestFormatter = generateRetriever(assertionCreator, staticClaims)) {
+ assertEquals(Collections.singletonMap("Content-Type", "application/x-www-form-urlencoded"), requestFormatter.formatRequestHeaders());
+ }
+ }
+
+ private KeyPair generateKeyPair() {
+ try {
+ KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
+ keyGen.initialize(2048);
+ return keyGen.generateKeyPair();
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException("Received unexpected error during private key generation", e);
+ }
+ }
+
+ private Map generateClaims() {
+ Map claims = new HashMap<>();
+ claims.put("sub", "testTokenSubject");
+ claims.put("iss", "testTokenIssuer");
+ claims.put("aud", "testTokenAudience");
+ return claims;
+ }
+
+ private AssertionCreator generateAssertionCreator(PrivateKey privateKey) {
+ return new DefaultAssertionCreator(
+ time,
+ () -> Base64.getEncoder().encodeToString(privateKey.getEncoded()),
+ "dummyPrivateKeyId",
+ "RS256"
+ );
+ }
+
+ private JwtBearerAccessTokenRetriever generateRetriever(AssertionCreator assertionCreator,
+ Map staticClaims) {
+ return new JwtBearerAccessTokenRetriever(
+ assertionCreator,
+ staticClaims,
+ null,
+ "https://www.example.com",
+ 100,
+ 10000,
+ null,
+ null
+ );
+ }
+
+}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
index 485146aea7ecb..0dc85942368ba 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
@@ -23,123 +23,76 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenRetriever;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenRetrieverFactory;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidator;
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenValidatorFactory;
import org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
+import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils;
import org.apache.kafka.common.security.oauthbearer.internals.secured.VerificationKeyResolverFactory;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.impl.Arguments;
-import net.sourceforge.argparse4j.inf.Argument;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
-import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_CIPHER_SUITES_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_CIPHER_SUITES_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_ENABLED_PROTOCOLS_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_KEY_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_KEY_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_PROTOCOL_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_PROTOCOL_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_PROVIDER_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_PROVIDER_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
-import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_DOC;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_DOC;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_DOC;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
-import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_DOC;
+import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
public class OAuthCompatibilityTool {
public static void main(String[] args) {
- ArgsHandler argsHandler = new ArgsHandler();
+ String description = String.format(
+ "This tool is used to verify OAuth/OIDC provider compatibility.%n%n" +
+ "Run the following script to determine the configuration options:%n%n" +
+ " ./bin/kafka-run-class.sh %s --help",
+ OAuthCompatibilityTool.class.getName());
+
+ ArgumentParser parser = ArgumentParsers
+ .newArgumentParser("oauth-compatibility-tool")
+ .defaultHelp(true)
+ .description(description);
+ parser.addArgument("-b", "--broker-configuration")
+ .type(String.class)
+ .metavar("broker-configuration")
+ .dest("brokerConfigFileName")
+ .required(true)
+ .help("Local file name that contains the broker configuration");
+ parser.addArgument("-c", "--client-configuration")
+ .type(String.class)
+ .metavar("client-configuration")
+ .dest("clientConfigFileName")
+ .required(true)
+ .help("Local file name that contains the client configuration");
+
Namespace namespace;
try {
- namespace = argsHandler.parseArgs(args);
+ namespace = parser.parseArgs(args);
} catch (ArgumentParserException e) {
+ parser.handleError(e);
Exit.exit(1);
return;
}
- ConfigHandler configHandler = new ConfigHandler(namespace);
-
- Map configs = configHandler.getConfigs();
- Map jaasConfigs = configHandler.getJaasOptions();
+ Time time = Time.SYSTEM;
try {
String accessToken;
{
// Client side...
- try (AccessTokenRetriever atr = AccessTokenRetrieverFactory.create(configs, jaasConfigs)) {
+ Map configs = getConfigs(namespace.getString("clientConfigFileName"));
+ JaasContext context = JaasContext.loadClientContext(configs);
+ Map jaasConfigs = JaasOptionsUtils.getOptions(OAUTHBEARER_MECHANISM, context.configurationEntries());
+
+ try (AccessTokenRetriever atr = AccessTokenRetrieverFactory.create(time, configs, jaasConfigs)) {
atr.init();
AccessTokenValidator atv = AccessTokenValidatorFactory.create(configs);
System.out.println("PASSED 1/5: client configuration");
@@ -154,6 +107,10 @@ public static void main(String[] args) {
{
// Broker side...
+ Map configs = getConfigs(namespace.getString("brokerConfigFileName"));
+ JaasContext context = JaasContext.loadClientContext(configs);
+ Map jaasConfigs = JaasOptionsUtils.getOptions(OAUTHBEARER_MECHANISM, context.configurationEntries());
+
try (CloseableVerificationKeyResolver vkr = VerificationKeyResolverFactory.create(configs, jaasConfigs)) {
vkr.init();
AccessTokenValidator atv = AccessTokenValidatorFactory.create(configs, vkr);
@@ -168,221 +125,27 @@ public static void main(String[] args) {
Exit.exit(0);
} catch (Throwable t) {
System.out.println("FAILED:");
- t.printStackTrace();
+ t.printStackTrace(System.err);
if (t instanceof ConfigException) {
System.out.printf("%n");
- argsHandler.parser.printHelp();
}
Exit.exit(1);
}
}
-
- private static class ArgsHandler {
-
- private static final String DESCRIPTION = String.format(
- "This tool is used to verify OAuth/OIDC provider compatibility.%n%n" +
- "Run the following script to determine the configuration options:%n%n" +
- " ./bin/kafka-run-class.sh %s --help",
- OAuthCompatibilityTool.class.getName());
-
- private final ArgumentParser parser;
-
- private ArgsHandler() {
- this.parser = ArgumentParsers
- .newArgumentParser("oauth-compatibility-tool")
- .defaultHelp(true)
- .description(DESCRIPTION);
- }
-
- private Namespace parseArgs(String[] args) throws ArgumentParserException {
- // SASL/OAuth
- addArgument(SASL_LOGIN_CONNECT_TIMEOUT_MS, SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC, Integer.class);
- addArgument(SASL_LOGIN_READ_TIMEOUT_MS, SASL_LOGIN_READ_TIMEOUT_MS_DOC, Integer.class);
- addArgument(SASL_LOGIN_RETRY_BACKOFF_MAX_MS, SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
- addArgument(SASL_LOGIN_RETRY_BACKOFF_MS, SASL_LOGIN_RETRY_BACKOFF_MS_DOC, Long.class);
- addArgument(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC, Integer.class);
- addArgument(SASL_OAUTHBEARER_EXPECTED_AUDIENCE, SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
- .action(Arguments.append());
- addArgument(SASL_OAUTHBEARER_EXPECTED_ISSUER, SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
- addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC, Long.class);
- addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
- addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC, Long.class);
- addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC);
- addArgument(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC);
- addArgument(SASL_OAUTHBEARER_SUB_CLAIM_NAME, SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC);
- addArgument(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC);
-
- // SSL
- addArgument(SSL_CIPHER_SUITES_CONFIG, SSL_CIPHER_SUITES_DOC)
- .action(Arguments.append());
- addArgument(SSL_ENABLED_PROTOCOLS_CONFIG, SSL_ENABLED_PROTOCOLS_DOC)
- .action(Arguments.append());
- addArgument(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC);
- addArgument(SSL_ENGINE_FACTORY_CLASS_CONFIG, SSL_ENGINE_FACTORY_CLASS_DOC);
- addArgument(SSL_KEYMANAGER_ALGORITHM_CONFIG, SSL_KEYMANAGER_ALGORITHM_DOC);
- addArgument(SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC);
- addArgument(SSL_KEYSTORE_KEY_CONFIG, SSL_KEYSTORE_KEY_DOC);
- addArgument(SSL_KEYSTORE_LOCATION_CONFIG, SSL_KEYSTORE_LOCATION_DOC);
- addArgument(SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_PASSWORD_DOC);
- addArgument(SSL_KEYSTORE_TYPE_CONFIG, SSL_KEYSTORE_TYPE_DOC);
- addArgument(SSL_KEY_PASSWORD_CONFIG, SSL_KEY_PASSWORD_DOC);
- addArgument(SSL_PROTOCOL_CONFIG, SSL_PROTOCOL_DOC);
- addArgument(SSL_PROVIDER_CONFIG, SSL_PROVIDER_DOC);
- addArgument(SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, SSL_SECURE_RANDOM_IMPLEMENTATION_DOC);
- addArgument(SSL_TRUSTMANAGER_ALGORITHM_CONFIG, SSL_TRUSTMANAGER_ALGORITHM_DOC);
- addArgument(SSL_TRUSTSTORE_CERTIFICATES_CONFIG, SSL_TRUSTSTORE_CERTIFICATES_DOC);
- addArgument(SSL_TRUSTSTORE_LOCATION_CONFIG, SSL_TRUSTSTORE_LOCATION_DOC);
- addArgument(SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_PASSWORD_DOC);
- addArgument(SSL_TRUSTSTORE_TYPE_CONFIG, SSL_TRUSTSTORE_TYPE_DOC);
-
- // JAAS options...
- addArgument(CLIENT_ID_CONFIG, CLIENT_ID_DOC);
- addArgument(CLIENT_SECRET_CONFIG, CLIENT_SECRET_DOC);
- addArgument(SCOPE_CONFIG, SCOPE_DOC);
-
- try {
- return parser.parseArgs(args);
- } catch (ArgumentParserException e) {
- parser.handleError(e);
- throw e;
- }
- }
-
- private Argument addArgument(String option, String help) {
- return addArgument(option, help, String.class);
- }
-
- private Argument addArgument(String option, String help, Class> clazz) {
- // Change foo.bar into --foo.bar.
- String name = "--" + option;
-
- return parser.addArgument(name)
- .type(clazz)
- .metavar(option)
- .dest(option)
- .help(help);
- }
-
- }
-
- private static class ConfigHandler {
-
- private final Namespace namespace;
-
-
- private ConfigHandler(Namespace namespace) {
- this.namespace = namespace;
- }
-
- private Map getConfigs() {
- Map m = new HashMap<>();
-
- // SASL/OAuth
- maybeAddInt(m, SASL_LOGIN_CONNECT_TIMEOUT_MS);
- maybeAddInt(m, SASL_LOGIN_READ_TIMEOUT_MS);
- maybeAddLong(m, SASL_LOGIN_RETRY_BACKOFF_MS);
- maybeAddLong(m, SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
- maybeAddString(m, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
- maybeAddString(m, SASL_OAUTHBEARER_SUB_CLAIM_NAME);
- maybeAddString(m, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
- maybeAddString(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
- maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS);
- maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS);
- maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS);
- maybeAddInt(m, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS);
- maybeAddStringList(m, SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
- maybeAddString(m, SASL_OAUTHBEARER_EXPECTED_ISSUER);
+ private static Map getConfigs(String fileName) {
+ try {
+ Map config = new HashMap<>(Utils.propsToMap(Utils.loadProps(fileName)));
// This here is going to fill in all the defaults for the values we don't specify...
ConfigDef cd = new ConfigDef();
SaslConfigs.addClientSaslSupport(cd);
SslConfigs.addClientSslSupport(cd);
- AbstractConfig config = new AbstractConfig(cd, m);
- return config.values();
- }
-
- private Map getJaasOptions() {
- Map m = new HashMap<>();
-
- // SASL/OAuth
- maybeAddString(m, CLIENT_ID_CONFIG);
- maybeAddString(m, CLIENT_SECRET_CONFIG);
- maybeAddString(m, SCOPE_CONFIG);
-
- // SSL
- maybeAddStringList(m, SSL_CIPHER_SUITES_CONFIG);
- maybeAddStringList(m, SSL_ENABLED_PROTOCOLS_CONFIG);
- maybeAddString(m, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
- maybeAddClass(m, SSL_ENGINE_FACTORY_CLASS_CONFIG);
- maybeAddString(m, SSL_KEYMANAGER_ALGORITHM_CONFIG);
- maybeAddPassword(m, SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG);
- maybeAddPassword(m, SSL_KEYSTORE_KEY_CONFIG);
- maybeAddString(m, SSL_KEYSTORE_LOCATION_CONFIG);
- maybeAddPassword(m, SSL_KEYSTORE_PASSWORD_CONFIG);
- maybeAddString(m, SSL_KEYSTORE_TYPE_CONFIG);
- maybeAddPassword(m, SSL_KEY_PASSWORD_CONFIG);
- maybeAddString(m, SSL_PROTOCOL_CONFIG);
- maybeAddString(m, SSL_PROVIDER_CONFIG);
- maybeAddString(m, SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
- maybeAddString(m, SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
- maybeAddPassword(m, SSL_TRUSTSTORE_CERTIFICATES_CONFIG);
- maybeAddString(m, SSL_TRUSTSTORE_LOCATION_CONFIG);
- maybeAddPassword(m, SSL_TRUSTSTORE_PASSWORD_CONFIG);
- maybeAddString(m, SSL_TRUSTSTORE_TYPE_CONFIG);
-
- return m;
- }
-
- private void maybeAddInt(Map m, String option) {
- Integer value = namespace.getInt(option);
-
- if (value != null)
- m.put(option, value);
- }
-
- private void maybeAddLong(Map m, String option) {
- Long value = namespace.getLong(option);
-
- if (value != null)
- m.put(option, value);
+ return new AbstractConfig(cd, config).values();
+ } catch (Exception e) {
+ throw new KafkaException("Could not load configuration from " + fileName);
}
-
- private void maybeAddString(Map m, String option) {
- String value = namespace.getString(option);
-
- if (value != null)
- m.put(option, value);
- }
-
- private void maybeAddPassword(Map m, String option) {
- String value = namespace.getString(option);
-
- if (value != null)
- m.put(option, new Password(value));
- }
-
- private void maybeAddClass(Map m, String option) {
- String value = namespace.getString(option);
-
- if (value != null) {
- try {
- m.put(option, Class.forName(value));
- } catch (ClassNotFoundException e) {
- throw new KafkaException("Could not find class for " + option, e);
- }
- }
- }
-
- private void maybeAddStringList(Map m, String option) {
- List value = namespace.getList(option);
-
- if (value != null)
- m.put(option, value);
- }
-
}
-
}