1
- from typing import Any , Dict , List
1
+ from typing import Any , Dict , List , Optional
2
2
3
3
from confluent_kafka import Consumer , Message , TopicPartition # type: ignore
4
4
from confluent_kafka .admin import TopicMetadata # type: ignore
8
8
from dlt .common .configuration import configspec
9
9
from dlt .common .configuration .specs import CredentialsConfiguration
10
10
from dlt .common .time import ensure_pendulum_datetime
11
- from dlt .common .typing import DictStrAny , TSecretValue , TAnyDateTime
11
+ from dlt .common .typing import DictStrAny , TSecretValue
12
12
from dlt .common .utils import digest128
13
13
14
14
@@ -231,9 +231,11 @@ class KafkaCredentials(CredentialsConfiguration):
231
231
bootstrap_servers : str = config .value
232
232
group_id : str = config .value
233
233
security_protocol : str = config .value
234
- sasl_mechanisms : str = config .value
235
- sasl_username : str = config .value
236
- sasl_password : TSecretValue = secrets .value
234
+
235
+ # Optional SASL credentials
236
+ sasl_mechanisms : Optional [str ] = config .value
237
+ sasl_username : Optional [str ] = config .value
238
+ sasl_password : Optional [TSecretValue ] = secrets .value
237
239
238
240
def init_consumer (self ) -> Consumer :
239
241
"""Init a Kafka consumer from this credentials.
@@ -245,9 +247,16 @@ def init_consumer(self) -> Consumer:
245
247
"bootstrap.servers" : self .bootstrap_servers ,
246
248
"group.id" : self .group_id ,
247
249
"security.protocol" : self .security_protocol ,
248
- "sasl.mechanisms" : self .sasl_mechanisms ,
249
- "sasl.username" : self .sasl_username ,
250
- "sasl.password" : self .sasl_password ,
251
250
"auto.offset.reset" : "earliest" ,
252
251
}
252
+
253
+ if self .sasl_mechanisms and self .sasl_username and self .sasl_password :
254
+ config .update (
255
+ {
256
+ "sasl.mechanisms" : self .sasl_mechanisms ,
257
+ "sasl.username" : self .sasl_username ,
258
+ "sasl.password" : self .sasl_password ,
259
+ }
260
+ )
261
+
253
262
return Consumer (config )
0 commit comments