diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index d6f163d14b9a9d..1aae834360fb21 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -3,13 +3,14 @@ import logging import time from collections.abc import Mapping, MutableMapping, Sequence +from concurrent.futures import Future from datetime import datetime -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast -from arroyo.backends.kafka import build_kafka_producer_configuration +from arroyo.backends.kafka import KafkaPayload, KafkaProducer +from arroyo.types import BrokerValue +from arroyo.types import Topic as ArroyoTopic from confluent_kafka import KafkaError -from confluent_kafka import Message as KafkaMessage -from confluent_kafka import Producer from sentry_kafka_schemas.codecs import Codec from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem @@ -20,8 +21,8 @@ from sentry.eventstream.types import EventStreamEventType from sentry.killswitches import killswitch_matches_context from sentry.utils import json -from sentry.utils.confluent_producer import get_confluent_producer -from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition +from sentry.utils.arroyo_producer import get_arroyo_producer +from sentry.utils.kafka_config import get_topic_definition EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS) @@ -37,30 +38,31 @@ def __init__(self, **options: Any) -> None: self.topic = Topic.EVENTS self.transactions_topic = Topic.TRANSACTIONS self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC - self.__producers: MutableMapping[Topic, Producer] = {} + self.__producers: MutableMapping[Topic, KafkaProducer] = {} self.error_last_logged_time: int | None = None def get_transactions_topic(self, project_id: int) -> Topic: return self.transactions_topic - def get_producer(self, topic: Topic) -> Producer: + def get_producer(self, topic: Topic) -> KafkaProducer: if topic not in self.__producers: - cluster_name = get_topic_definition(topic)["cluster"] - cluster_options = get_kafka_producer_cluster_options(cluster_name) - cluster_options["client.id"] = "sentry.eventstream.kafka" - # XXX(markus): We should use `sentry.utils.arroyo_producer.get_arroyo_producer`. - self.__producers[topic] = get_confluent_producer( - build_kafka_producer_configuration(default_config=cluster_options) + self.__producers[topic] = get_arroyo_producer( + name="sentry.eventstream.kafka", + topic=topic, + use_simple_futures=False, ) return self.__producers[topic] - def delivery_callback(self, error: KafkaError | None, message: KafkaMessage) -> None: + def delivery_callback(self, error: KafkaError | None) -> None: now = int(time.time()) if error is not None: if self.error_last_logged_time is None or now > self.error_last_logged_time + 60: self.error_last_logged_time = now - logger.error("Could not publish message (error: %s): %r", error, message) + logger.error( + "Could not publish message (error: %s)", + error, + ) def _get_headers_for_insert( self, @@ -189,38 +191,29 @@ def _send( producer = self.get_producer(topic) - # Polling the producer is required to ensure callbacks are fired. This - # means that the latency between a message being delivered (or failing - # to be delivered) and the corresponding callback being fired is - # roughly the same as the duration of time that passes between publish - # calls. If this ends up being too high, the publisher should be moved - # into a background thread that can poll more frequently without - # interfering with request handling. (This does `poll` does not act as - # a heartbeat for the purposes of any sort of session expiration.) - # Note that this call to poll() is *only* dealing with earlier - # asynchronous produce() calls from the same process. - producer.poll(0.0) - assert isinstance(extra_data, tuple) real_topic = get_topic_definition(topic)["real_topic_name"] try: - producer.produce( - topic=real_topic, - key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None, - value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data), - on_delivery=self.delivery_callback, - headers=[(k, v.encode("utf-8")) for k, v in headers.items()], + produce_future = producer.produce( + destination=ArroyoTopic(real_topic), + payload=KafkaPayload( + key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None, + value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data).encode( + "utf-8" + ), + headers=[(k, v.encode("utf-8")) for k, v in headers.items()], + ), + ) + # Since use_simple_futures=False, we know this is a Future + cast(Future[BrokerValue[KafkaPayload]], produce_future).add_done_callback( + lambda future: self.delivery_callback(future.exception()) ) except Exception as error: logger.exception("Could not publish message: %s", error) return - if not asynchronous: - # flush() is a convenience method that calls poll() until len() is zero - producer.flush() - def requires_post_process_forwarder(self) -> bool: return True @@ -228,9 +221,11 @@ def _send_item(self, trace_item: TraceItem) -> None: producer = self.get_producer(Topic.SNUBA_ITEMS) real_topic = get_topic_definition(Topic.SNUBA_ITEMS)["real_topic_name"] try: - producer.produce( - topic=real_topic, - value=EAP_ITEMS_CODEC.encode(trace_item), + _ = producer.produce( + destination=ArroyoTopic(real_topic), + payload=KafkaPayload( + key=None, value=EAP_ITEMS_CODEC.encode(trace_item), headers=[] + ), ) except Exception as error: logger.exception("Could not publish trace items: %s", error) diff --git a/tests/sentry/eventstream/test_eventstream.py b/tests/sentry/eventstream/test_eventstream.py index 6c00fa773a95e2..29203038fd557f 100644 --- a/tests/sentry/eventstream/test_eventstream.py +++ b/tests/sentry/eventstream/test_eventstream.py @@ -63,16 +63,16 @@ def __produce_event(self, *insert_args: Any, **insert_kwargs: Any) -> None: produce_args, produce_kwargs = list(producer.produce.call_args) assert not produce_args if event_type == EventStreamEventType.Transaction: - assert produce_kwargs["topic"] == "transactions" - assert produce_kwargs["key"] is None + assert produce_kwargs["destination"].name == "transactions" + assert produce_kwargs["payload"].key is None elif event_type == EventStreamEventType.Generic: - assert produce_kwargs["topic"] == "generic-events" - assert produce_kwargs["key"] is None + assert produce_kwargs["destination"].name == "generic-events" + assert produce_kwargs["payload"].key is None else: - assert produce_kwargs["topic"] == "events" - assert produce_kwargs["key"] == str(self.project.id).encode("utf-8") + assert produce_kwargs["destination"].name == "events" + assert produce_kwargs["payload"].key == str(self.project.id).encode("utf-8") - version, type_, payload1, payload2 = json.loads(produce_kwargs["value"]) + version, type_, payload1, payload2 = json.loads(produce_kwargs["payload"].value) assert version == 2 assert type_ == "insert" @@ -97,10 +97,10 @@ def __produce_payload( produce_args, produce_kwargs = list(producer.produce.call_args) assert not produce_args - version, type_, payload1, payload2 = json.loads(produce_kwargs["value"]) + version, type_, payload1, payload2 = json.loads(produce_kwargs["payload"].value) # only return headers and body payload - return produce_kwargs["headers"], payload2 + return produce_kwargs["payload"].headers, payload2 def test_init_options(self) -> None: # options in the constructor shouldn't cause errors @@ -250,9 +250,9 @@ def test_groupevent_occurrence_passed(self, mock_eventstream_insert: MagicMock) self.__produce_event(*insert_args, **insert_kwargs) producer = self.producer_mock produce_args, produce_kwargs = list(producer.produce.call_args) - version, type_, payload1, payload2 = json.loads(produce_kwargs["value"]) - assert produce_kwargs["topic"] == "generic-events" - assert produce_kwargs["key"] is None + version, type_, payload1, payload2 = json.loads(produce_kwargs["payload"].value) + assert produce_kwargs["destination"].name == "generic-events" + assert produce_kwargs["payload"].key is None assert version == 2 assert type_ == "insert" occurrence_data = group_event.occurrence.to_dict()