Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/sentry/eventstream/kafka/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from sentry.eventstream.types import EventStreamEventType
from sentry.killswitches import killswitch_matches_context
from sentry.utils import json
from sentry.utils.confluent_producer import get_confluent_producer
from sentry.utils.arroyo_producer import get_arroyo_producer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)
Expand Down Expand Up @@ -48,9 +48,12 @@ def get_producer(self, topic: Topic) -> Producer:
cluster_name = get_topic_definition(topic)["cluster"]
cluster_options = get_kafka_producer_cluster_options(cluster_name)
cluster_options["client.id"] = "sentry.eventstream.kafka"
# XXX(markus): We should use `sentry.utils.arroyo_producer.get_arroyo_producer`.
self.__producers[topic] = get_confluent_producer(
build_kafka_producer_configuration(default_config=cluster_options)
config = build_kafka_producer_configuration(default_config=cluster_options)
config["queue.buffering.max.messages"] = 200000
self.__producers[topic] = get_arroyo_producer(
name="sentry.eventstream.kafka",
topic=topic,
additional_config=cluster_options,
)

return self.__producers[topic]
Expand Down
Loading