Skip to content

Commit 87a4484

Browse files
committed
Fix ssl timeout errors for Quix applications (#254)
1 parent 1100753 commit 87a4484

File tree

3 files changed

+21
-1
lines changed

3 files changed

+21
-1
lines changed

quixstreams/app.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ def __init__(
164164
self._on_processing_error = on_processing_error or default_on_processing_error
165165
self._on_message_processed = on_message_processed
166166
self._quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None
167-
self._state_manager: Optional[StateStoreManager] = None
168167
self._state_manager = StateStoreManager(
169168
group_id=consumer_group,
170169
state_dir=state_dir,

quixstreams/platforms/quix/config.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
"TopicCreationConfigs",
2020
)
2121

22+
QUIX_CONNECTIONS_MAX_IDLE_MS = 3 * 60 * 1000
23+
QUIX_METADATA_MAX_AGE_MS = 3 * 60 * 1000
24+
2225

2326
@dataclasses.dataclass
2427
class TopicCreationConfigs:
@@ -410,6 +413,18 @@ def get_confluent_broker_config(self, known_topic: Optional[str] = None) -> dict
410413
cfg_out[c_cfg] = self.QuixApiKafkaAuthConfigMap.values.get(value, value)
411414
cfg_out["ssl.endpoint.identification.algorithm"] = "none"
412415
cfg_out["ssl.ca.location"] = self._set_workspace_cert()
416+
417+
# Set the connection idle timeout and metadata max age to be less than
418+
# Azure's default 4 minutes.
419+
# Azure LB kills the inbound TCP connections after 4 mins and these settings
420+
# help to handle that.
421+
# More about this issue:
422+
# - https://github.com/confluentinc/librdkafka/issues/3109
423+
# - https://learn.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations#producer-and-consumer-configurations-1
424+
# These values can be overwritten on the Application level by passing
425+
# `extra_consumer_config` or `extra_producer_config` parameters.
426+
cfg_out["connections.max.idle.ms"] = QUIX_CONNECTIONS_MAX_IDLE_MS
427+
cfg_out["metadata.max.age.ms"] = QUIX_METADATA_MAX_AGE_MS
413428
self._confluent_broker_config = cfg_out
414429
return self._confluent_broker_config
415430

tests/test_quixstreams/test_platforms/test_quix/test_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
from requests import HTTPError, Response
99

1010
from quixstreams.models.topics import Topic
11+
from quixstreams.platforms.quix.config import (
12+
QUIX_CONNECTIONS_MAX_IDLE_MS,
13+
QUIX_METADATA_MAX_AGE_MS,
14+
)
1115

1216

1317
class TestQuixKafkaConfigsBuilder:
@@ -404,6 +408,8 @@ def test_get_confluent_broker_config(self, quix_kafka_config_factory):
404408
"sasl.password": "my-password",
405409
"ssl.ca.location": "/mock/dir/ca.cert",
406410
"ssl.endpoint.identification.algorithm": "none",
411+
"connections.max.idle.ms": QUIX_CONNECTIONS_MAX_IDLE_MS,
412+
"metadata.max.age.ms": QUIX_METADATA_MAX_AGE_MS,
407413
}
408414

409415
def test_append_workspace_id(self, quix_kafka_config_factory):

0 commit comments

Comments
 (0)