Skip to content

Commit d6904b0

Browse files
🐛 bug(55827):[source-kafka]: added AWS MSK IAM Auth jar to classpath an… (#55828)
Co-authored-by: Marcos Marx <[email protected]>
1 parent 3e3eefb commit d6904b0

File tree

6 files changed

+15
-3
lines changed

6 files changed

+15
-3
lines changed

airbyte-integrations/connectors/source-kafka/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ application {
1515

1616
dependencies {
1717

18+
implementation 'software.amazon.msk:aws-msk-iam-auth:2.3.1'
1819
implementation 'org.apache.kafka:kafka-clients:3.2.1'
1920
implementation 'org.apache.kafka:connect-json:3.2.1'
2021
implementation 'io.confluent:kafka-avro-serializer:7.2.1'
2122

23+
2224
testImplementation 'org.testcontainers:kafka:1.19.4'
2325
}

airbyte-integrations/connectors/source-kafka/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
- suite: integrationTests
1111
connectorType: source
1212
definitionId: d917a47b-8537-4d0d-8c10-36a9928d4265
13-
dockerImageTag: 0.3.0
13+
dockerImageTag: 0.4.0
1414
dockerRepository: airbyte/source-kafka
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/kafka
1616
githubIssueLabel: source-kafka

airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler;
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
23+
import software.amazon.msk.auth.iam.IAMClientCallbackHandler;
2324

2425
public abstract class AbstractFormat implements KafkaFormat {
2526

@@ -85,6 +86,9 @@ private Map<String, Object> propertiesByProtocol(final JsonNode config) {
8586
if (saslMechanism.equals(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)) {
8687
builder.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, protocolConfig.get("oauthbearer_token_endpoint_url").asText());
8788
builder.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, OAuthBearerLoginCallbackHandler.class.getName());
89+
} else if (saslMechanism.equals("AWS_MSK_IAM")) {
90+
// IAMClientCallbackHandler
91+
builder.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, IAMClientCallbackHandler.class.getName());
8892
}
8993
}
9094
default -> throw new RuntimeException("Unexpected Kafka protocol: " + Jsons.serialize(protocol));

airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import com.google.common.collect.AbstractIterator;
9+
import com.google.common.collect.ImmutableMap;
910
import com.google.common.collect.Lists;
11+
import io.airbyte.commons.json.Jsons;
1012
import io.airbyte.commons.util.AutoCloseableIterator;
1113
import io.airbyte.commons.util.AutoCloseableIterators;
1214
import io.airbyte.protocol.models.Field;
@@ -151,7 +153,9 @@ protected AirbyteMessage computeNext() {
151153
.withRecord(new AirbyteRecordMessage()
152154
.withStream(record.topic())
153155
.withEmittedAt(Instant.now().toEpochMilli())
154-
.withData(record.value()));
156+
.withData(record.value())
157+
.withData(Jsons.jsonNode(ImmutableMap.builder().put("value", record.value()).build())));
158+
155159
}
156160

157161
return endOfData();

airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@
188188
"OAUTHBEARER",
189189
"SCRAM-SHA-256",
190190
"SCRAM-SHA-512",
191-
"PLAIN"
191+
"PLAIN",
192+
"AWS_MSK_IAM"
192193
]
193194
},
194195
"sasl_jaas_config": {

docs/integrations/sources/kafka.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ AVRO - deserialize Using confluent API. Please refer (https://docs.confluent.io/
6868

6969
| Version | Date | Pull Request | Subject |
7070
| :------ | :--------- |:---------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------|
71+
| 0.4.0 | 2025-03-18 | [55828](https://github.com/airbytehq/airbyte/pull/55828) | Add configurations for AWS MSK IAM and fix JSON format |
7172
| 0.3.0 | 2025-02-18 | [53231](https://github.com/airbytehq/airbyte/pull/53231) | Add configurations for OAUTHBEARER SASL Mechanism |
7273
| 0.2.8 | 2025-02-07 | [53221](https://github.com/airbytehq/airbyte/pull/53221) | For AVRO MessageFormat, schema_registry_password is a secret |
7374
| 0.2.7 | 2025-01-10 | [51480](https://github.com/airbytehq/airbyte/pull/51480) | Use a non root base image |

0 commit comments

Comments
 (0)