Skip to content

debug Native writer job #14247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 94 additions & 94 deletions .gitlab/benchmarks/bp-runner.microbenchmarks.fail-on-breach.yml

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions .riot/requirements/1550212.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.13
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1550212.in
#
attrs==25.3.0
coverage[toml]==7.10.1
hypothesis==6.45.0
iniconfig==2.1.0
mock==5.2.0
msgpack==1.1.1
opentracing==2.4.0
packaging==25.0
pluggy==1.6.0
pygments==2.19.2
pytest==8.4.1
pytest-cov==6.2.1
pytest-mock==3.14.1
pytest-randomly==3.16.0
sortedcontainers==2.4.0
4 changes: 2 additions & 2 deletions ddtrace/_trace/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if root_ctx and root_ctx.sampling_priority is None:
self.sampler.sample(trace[0]._local_root)
# When stats computation is enabled in the tracer then we can
# safely drop the traces.
if self._compute_stats_enabled and not self.apm_opt_out:
# safely drop the traces. When using the NativeWriter this is handled by native code.
if not config._trace_writer_native and self._compute_stats_enabled and not self.apm_opt_out:
priority = root_ctx.sampling_priority if root_ctx is not None else None
if priority is not None and priority <= 0:
# When any span is marked as keep by a single span sampling
Expand Down
5 changes: 4 additions & 1 deletion ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def _default_span_processors_factory(

span_processors.append(AppSecIastSpanProcessor())

if config._trace_compute_stats:
# When using the NativeWriter stats are computed by the native code.
if config._trace_compute_stats and not config._trace_writer_native:
# Inline the import to avoid pulling in ddsketch or protobuf
# when importing ddtrace.
from ddtrace.internal.processor.stats import SpanStatsProcessorV06
Expand Down Expand Up @@ -264,6 +265,8 @@ def sample(self, span):
self._sampler.sample(span)

def _sample_before_fork(self) -> None:
if isinstance(self._span_aggregator.writer, AgentWriterInterface):
self._span_aggregator.writer.before_fork()
span = self.current_root_span()
if span is not None and span.context.sampling_priority is None:
self.sample(span)
Expand Down
9 changes: 9 additions & 0 deletions ddtrace/internal/native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@
from typing import Optional
from typing import Tuple

from ._native import AgentError # noqa: F401
from ._native import BuilderError # noqa: F401
from ._native import DDSketch # noqa: F401
from ._native import DeserializationError # noqa: F401
from ._native import IoError # noqa: F401
from ._native import NetworkError # noqa: F401
from ._native import PyConfigurator
from ._native import PyTracerMetadata # noqa: F401
from ._native import RequestError # noqa: F401
from ._native import SerializationError # noqa: F401
from ._native import TraceExporter # noqa: F401
from ._native import TraceExporterBuilder # noqa: F401
from ._native import store_metadata # noqa: F401


Expand Down
241 changes: 241 additions & 0 deletions ddtrace/internal/native/_native.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,244 @@ def store_metadata(data: PyTracerMetadata) -> PyAnonymousFileHandle:
:param data: The tracer configuration to store.
"""
...

class TraceExporter:
"""
TraceExporter is a class responsible for exporting traces to the Agent.
"""

def __init__(self):
"""
Initialize a TraceExporter.
"""
...
def send(self, data: bytes, trace_count: int) -> str:
"""
Send a trace payload to the Agent.
:param data: The msgpack encoded trace payload to send.
:param trace_count: The number of traces in the data payload.
"""
...
def shutdown(self, timeout_ns: int) -> None:
"""
Shutdown the TraceExporter, releasing any resources and ensuring all pending stats are sent.
This method should be called before the application exits to ensure proper cleanup.
:param timeout_ns: The maximum time to wait for shutdown in nanoseconds.
"""
...
def drop(self) -> None:
"""
Drop the TraceExporter, releasing any resources without sending pending stats.
"""
...
def run_worker(self) -> None:
"""
Start the rust worker threads.
This starts the runtime required to process rust async tasks including stats and telemetry sending.
The runtime will also be created when calling `send`,
this method can be used to start the runtime before sending any traces.
"""
...
def stop_worker(self) -> None:
"""
Stop the rust worker threads.
This stops the async runtime and must be called before forking to avoid deadlocks after forking.
This should be called even if `run_worker` hasn't been called as the runtime will be started
when calling `send`.
"""
...
def debug(self) -> str:
"""
Returns a string representation of the exporter.
Should only be used for debugging.
"""
...

class TraceExporterBuilder:
"""
TraceExporterBuilder is a class responsible for building a TraceExporter.
"""

def __init__(self):
"""
Initialize a TraceExporterBuilder.
"""
...
def set_hostname(self, hostname: str) -> TraceExporterBuilder:
"""
Set the hostname of the TraceExporter.
:param hostname: The hostname to set for the TraceExporter.
"""
...
def set_url(self, url: str) -> TraceExporterBuilder:
"""
Set the agent url of the TraceExporter.
:param url: The URL of the agent to send traces to.
"""
...
def set_dogstatsd_url(self, url: str) -> TraceExporterBuilder:
"""
Set the DogStatsD URL of the TraceExporter.
:param url: The URL of the DogStatsD endpoint.
"""
...
def set_env(self, env: str) -> TraceExporterBuilder:
"""
Set the env of the TraceExporter.
:param env: The environment name (e.g., 'prod', 'staging', 'dev').
"""
...
def set_app_version(self, version: str) -> TraceExporterBuilder:
"""
Set the app version of the TraceExporter.
:param version: The version string of the application.
"""
...
def set_git_commit_sha(self, git_commit_sha: str) -> TraceExporterBuilder:
"""
Set the git commit sha of the TraceExporter.
:param git_commit_sha: The git commit SHA of the current code version.
"""
...
def set_tracer_version(self, version: str) -> TraceExporterBuilder:
"""
Set the tracer version of the TraceExporter.
:param version: The version string of the tracer.
"""
...
def set_language(self, language: str) -> TraceExporterBuilder:
"""
Set the language of the TraceExporter.
:param language: The programming language being traced (e.g., 'python').
"""
...
def set_language_version(self, version: str) -> TraceExporterBuilder:
"""
Set the language version of the TraceExporter.
:param version: The version string of the programming language.
"""
...
def set_language_interpreter(self, interpreter: str) -> TraceExporterBuilder:
"""
Set the language interpreter of the TraceExporter.
:param vendor: The language interpreter.
"""
...
def set_language_interpreter_vendor(self, vendor: str) -> TraceExporterBuilder:
"""
Set the language interpreter vendor of the TraceExporter.
:param vendor: The vendor of the language interpreter.
"""
...
def set_test_session_token(self, token: str) -> TraceExporterBuilder:
"""
Set the test session token for the TraceExporter.
:param token: The test session token to use for authentication.
"""
...
def set_input_format(self, input_format: str) -> TraceExporterBuilder:
"""
Set the input format for the trace data.
:param input_format: The format to use for input traces (supported values are "v0.4" and "v0.5").
:raises ValueError: If input_format is not a supported value.
"""
...
def set_output_format(self, output_format: str) -> TraceExporterBuilder:
"""
Set the output format for the trace data.
:param output_format: The format to use for output traces (supported values are "v0.4" and "v0.5").
:raises ValueError: If output_format is not a supported value.
"""
...
def set_client_computed_top_level(self) -> TraceExporterBuilder:
"""
Set the header indicating the tracer has computed the top-level tag
"""
...
def set_client_computed_stats(self) -> TraceExporterBuilder:
"""
Set the header indicating the tracer has already computed stats.
This should not be used along with `enable_stats`.
The main use is to opt-out trace metrics.
"""
...
def enable_stats(self, bucket_size_ns: int) -> TraceExporterBuilder:
"""
Enable stats computation in the TraceExporter
:param bucket_size_ns: The size of stats bucket in nanoseconds.
"""
...
def enable_telemetry(
self,
heartbeat_ms: int,
runtime_id: str,
) -> TraceExporterBuilder:
"""
Emit telemetry in the TraceExporter
:param heartbeat: The flush interval for telemetry metrics in milliseconds.
:param runtime_id: The runtime id to use for telemetry.
"""
...
def build(self) -> TraceExporter:
"""
Build and return a TraceExporter instance with the configured settings.
This method consumes the builder, so it cannot be used again after calling build.
:return: A configured TraceExporter instance.
:raises ValueError: If the builder has already been consumed or if required settings are missing.
"""
...
def debug(self) -> str:
"""
Returns a string representation of the exporter.
Should only be used for debugging.
"""
...

class AgentError(Exception):
"""
Raised when there is an error in agent response processing.
"""

...

class BuilderError(Exception):
"""
Raised when there is an error in the TraceExporterBuilder configuration.
"""

...

class DeserializationError(Exception):
"""
Raised when there is an error deserializing trace payload.
"""

...

class IoError(Exception):
"""
Raised when there is an I/O error during trace processing.
"""

...

class NetworkError(Exception):
"""
Raised when there is a network-related error during trace processing.
"""

...

class RequestError(Exception):
"""
Raised when the agent responds with an error code.
"""

...

class SerializationError(Exception):
"""
Raised when there is an error serializing trace payload.
"""

...
2 changes: 2 additions & 0 deletions ddtrace/internal/writer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .writer import AgentWriterInterface
from .writer import HTTPWriter
from .writer import LogWriter
from .writer import NativeWriter
from .writer import Response
from .writer import TraceWriter
from .writer import _human_size
Expand All @@ -18,6 +19,7 @@
"DEFAULT_SMA_WINDOW",
"HTTPWriter",
"LogWriter",
"NativeWriter",
"Response",
"TraceWriter",
"WriterClientBase",
Expand Down
Loading
Loading