Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 36 additions & 41 deletions src/sentry/eventstream/kafka/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import logging
import time
from collections.abc import Mapping, MutableMapping, Sequence
from concurrent.futures import Future
from datetime import datetime
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast

from arroyo.backends.kafka import build_kafka_producer_configuration
from arroyo.backends.kafka import KafkaPayload, KafkaProducer
from arroyo.types import BrokerValue
from arroyo.types import Topic as ArroyoTopic
from confluent_kafka import KafkaError
from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
from sentry_kafka_schemas.codecs import Codec
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

Expand All @@ -20,8 +21,8 @@
from sentry.eventstream.types import EventStreamEventType
from sentry.killswitches import killswitch_matches_context
from sentry.utils import json
from sentry.utils.confluent_producer import get_confluent_producer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
from sentry.utils.arroyo_producer import get_arroyo_producer
from sentry.utils.kafka_config import get_topic_definition

EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)

Expand All @@ -37,30 +38,31 @@ def __init__(self, **options: Any) -> None:
self.topic = Topic.EVENTS
self.transactions_topic = Topic.TRANSACTIONS
self.issue_platform_topic = Topic.EVENTSTREAM_GENERIC
self.__producers: MutableMapping[Topic, Producer] = {}
self.__producers: MutableMapping[Topic, KafkaProducer] = {}
self.error_last_logged_time: int | None = None

def get_transactions_topic(self, project_id: int) -> Topic:
return self.transactions_topic

def get_producer(self, topic: Topic) -> Producer:
def get_producer(self, topic: Topic) -> KafkaProducer:
if topic not in self.__producers:
cluster_name = get_topic_definition(topic)["cluster"]
cluster_options = get_kafka_producer_cluster_options(cluster_name)
cluster_options["client.id"] = "sentry.eventstream.kafka"
# XXX(markus): We should use `sentry.utils.arroyo_producer.get_arroyo_producer`.
self.__producers[topic] = get_confluent_producer(
build_kafka_producer_configuration(default_config=cluster_options)
self.__producers[topic] = get_arroyo_producer(
name="sentry.eventstream.kafka",
topic=topic,
use_simple_futures=False,
)

return self.__producers[topic]

def delivery_callback(self, error: KafkaError | None, message: KafkaMessage) -> None:
def delivery_callback(self, error: KafkaError | None) -> None:
now = int(time.time())
if error is not None:
if self.error_last_logged_time is None or now > self.error_last_logged_time + 60:
self.error_last_logged_time = now
logger.error("Could not publish message (error: %s): %r", error, message)
logger.error(
"Could not publish message (error: %s)",
error,
)

def _get_headers_for_insert(
self,
Expand Down Expand Up @@ -189,48 +191,41 @@ def _send(

producer = self.get_producer(topic)

# Polling the producer is required to ensure callbacks are fired. This
# means that the latency between a message being delivered (or failing
# to be delivered) and the corresponding callback being fired is
# roughly the same as the duration of time that passes between publish
# calls. If this ends up being too high, the publisher should be moved
# into a background thread that can poll more frequently without
# interfering with request handling. (This does `poll` does not act as
# a heartbeat for the purposes of any sort of session expiration.)
# Note that this call to poll() is *only* dealing with earlier
# asynchronous produce() calls from the same process.
producer.poll(0.0)

assert isinstance(extra_data, tuple)

real_topic = get_topic_definition(topic)["real_topic_name"]

try:
producer.produce(
topic=real_topic,
key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None,
value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data),
on_delivery=self.delivery_callback,
headers=[(k, v.encode("utf-8")) for k, v in headers.items()],
produce_future = producer.produce(
destination=ArroyoTopic(real_topic),
payload=KafkaPayload(
key=str(project_id).encode("utf-8") if not skip_semantic_partitioning else None,
value=json.dumps((self.EVENT_PROTOCOL_VERSION, _type) + extra_data).encode(
"utf-8"
),
headers=[(k, v.encode("utf-8")) for k, v in headers.items()],
),
)
# Since use_simple_futures=False, we know this is a Future
cast(Future[BrokerValue[KafkaPayload]], produce_future).add_done_callback(
lambda future: self.delivery_callback(future.exception())
)
except Exception as error:
logger.exception("Could not publish message: %s", error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrt the if not asynchronous handling, I don't see a flush() method on the KafkaProducer. I think we want to do produce_future.result() to block until delivery completes instead.

return

if not asynchronous:
# flush() is a convenience method that calls poll() until len() is zero
producer.flush()

def requires_post_process_forwarder(self) -> bool:
return True

def _send_item(self, trace_item: TraceItem) -> None:
producer = self.get_producer(Topic.SNUBA_ITEMS)
real_topic = get_topic_definition(Topic.SNUBA_ITEMS)["real_topic_name"]
try:
producer.produce(
topic=real_topic,
value=EAP_ITEMS_CODEC.encode(trace_item),
_ = producer.produce(
destination=ArroyoTopic(real_topic),
payload=KafkaPayload(
key=None, value=EAP_ITEMS_CODEC.encode(trace_item), headers=[]
),
)
except Exception as error:
logger.exception("Could not publish trace items: %s", error)
24 changes: 12 additions & 12 deletions tests/sentry/eventstream/test_eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ def __produce_event(self, *insert_args: Any, **insert_kwargs: Any) -> None:
produce_args, produce_kwargs = list(producer.produce.call_args)
assert not produce_args
if event_type == EventStreamEventType.Transaction:
assert produce_kwargs["topic"] == "transactions"
assert produce_kwargs["key"] is None
assert produce_kwargs["destination"].name == "transactions"
assert produce_kwargs["payload"].key is None
elif event_type == EventStreamEventType.Generic:
assert produce_kwargs["topic"] == "generic-events"
assert produce_kwargs["key"] is None
assert produce_kwargs["destination"].name == "generic-events"
assert produce_kwargs["payload"].key is None
else:
assert produce_kwargs["topic"] == "events"
assert produce_kwargs["key"] == str(self.project.id).encode("utf-8")
assert produce_kwargs["destination"].name == "events"
assert produce_kwargs["payload"].key == str(self.project.id).encode("utf-8")

version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
version, type_, payload1, payload2 = json.loads(produce_kwargs["payload"].value)
assert version == 2
assert type_ == "insert"

Expand All @@ -97,10 +97,10 @@ def __produce_payload(
produce_args, produce_kwargs = list(producer.produce.call_args)
assert not produce_args

version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
version, type_, payload1, payload2 = json.loads(produce_kwargs["payload"].value)

# only return headers and body payload
return produce_kwargs["headers"], payload2
return produce_kwargs["payload"].headers, payload2

def test_init_options(self) -> None:
# options in the constructor shouldn't cause errors
Expand Down Expand Up @@ -250,9 +250,9 @@ def test_groupevent_occurrence_passed(self, mock_eventstream_insert: MagicMock)
self.__produce_event(*insert_args, **insert_kwargs)
producer = self.producer_mock
produce_args, produce_kwargs = list(producer.produce.call_args)
version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
assert produce_kwargs["topic"] == "generic-events"
assert produce_kwargs["key"] is None
version, type_, payload1, payload2 = json.loads(produce_kwargs["payload"].value)
assert produce_kwargs["destination"].name == "generic-events"
assert produce_kwargs["payload"].key is None
assert version == 2
assert type_ == "insert"
occurrence_data = group_event.occurrence.to_dict()
Expand Down
Loading