diff --git a/.gitlab/benchmarks/bp-runner.microbenchmarks.fail-on-breach.yml b/.gitlab/benchmarks/bp-runner.microbenchmarks.fail-on-breach.yml index f6fffb1f4b2..cf0075a4e9e 100644 --- a/.gitlab/benchmarks/bp-runner.microbenchmarks.fail-on-breach.yml +++ b/.gitlab/benchmarks/bp-runner.microbenchmarks.fail-on-breach.yml @@ -10,53 +10,53 @@ experiments: - name: coreapiscenario-context_with_data_listeners thresholds: - execution_time < 0.02 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: coreapiscenario-context_with_data_no_listeners thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: coreapiscenario-context_with_data_only_all_listeners thresholds: - execution_time < 0.02 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: coreapiscenario-get_item_exists thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: coreapiscenario-get_item_missing thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: coreapiscenario-set_item thresholds: - execution_time < 0.03 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB # djangosimple - name: djangosimple-appsec thresholds: - execution_time < 22.30 ms - - max_rss_usage < 65.50 MB + - max_rss_usage < 66.00 MB - name: djangosimple-exception-replay-enabled thresholds: - execution_time < 1.45 ms - - max_rss_usage < 65.50 MB + - max_rss_usage < 66.00 MB - name: djangosimple-iast thresholds: - execution_time < 22.25 ms - - max_rss_usage < 65.50 MB + - max_rss_usage < 66.00 MB - name: djangosimple-profiler thresholds: - execution_time < 16.55 ms - - max_rss_usage < 53.00 MB + - max_rss_usage < 53.50 MB - name: djangosimple-span-code-origin thresholds: - execution_time < 28.20 ms - - max_rss_usage < 68.00 MB + - max_rss_usage < 68.50 MB - name: djangosimple-tracer thresholds: - execution_time < 22.70 ms - - max_rss_usage < 65.50 MB + - max_rss_usage < 66.00 MB - name: djangosimple-tracer-and-profiler thresholds: - execution_time < 24.90 ms @@ -64,19 +64,19 @@ experiments: - name: djangosimple-tracer-no-caches thresholds: - execution_time < 19.65 ms - - max_rss_usage < 65.50 MB + - max_rss_usage < 66.00 MB - name: djangosimple-tracer-no-databases thresholds: - execution_time < 20.10 ms - - max_rss_usage < 65.50 MB + - max_rss_usage < 66.00 MB - name: djangosimple-tracer-no-middleware thresholds: - execution_time < 22.50 ms - - max_rss_usage < 65.50 MB + - max_rss_usage < 66.00 MB - name: djangosimple-tracer-no-templates thresholds: - execution_time < 22.25 ms - - max_rss_usage < 65.50 MB + - max_rss_usage < 66.00 MB # errortrackingdjangosimple - name: errortrackingdjangosimple-errortracking-enabled-all @@ -96,29 +96,29 @@ experiments: - name: errortrackingflasksqli-errortracking-enabled-all thresholds: - execution_time < 2.30 ms - - max_rss_usage < 53.00 MB + - max_rss_usage < 53.50 MB - name: errortrackingflasksqli-errortracking-enabled-user thresholds: - execution_time < 2.25 ms - - max_rss_usage < 53.00 MB + - max_rss_usage < 53.50 MB - name: errortrackingflasksqli-tracer-enabled thresholds: - execution_time < 2.30 ms - - max_rss_usage < 53.00 MB + - max_rss_usage < 53.50 MB # flask_simple - name: flasksimple-tracer thresholds: - execution_time < 3.65 ms - - max_rss_usage < 53.00 MB + - max_rss_usage < 53.50 MB - name: flasksimple-profiler thresholds: - execution_time < 2.10 ms - - max_rss_usage < 44.00 MB + - max_rss_usage < 46.50 MB - name: flasksimple-debugger thresholds: - execution_time < 2.00 ms - - max_rss_usage < 45.00 MB + - max_rss_usage < 46.50 MB - name: flasksimple-iast-get thresholds: - execution_time < 2.00 ms @@ -126,15 +126,15 @@ experiments: - name: flasksimple-appsec-get thresholds: - execution_time < 4.75 ms - - max_rss_usage < 64.00 MB + - max_rss_usage < 64.50 MB - name: flasksimple-appsec-post thresholds: - execution_time < 6.75 ms - - max_rss_usage < 64.00 MB + - max_rss_usage < 64.50 MB - name: flasksimple-appsec-telemetry thresholds: - execution_time < 4.75 ms - - max_rss_usage < 64.00 MB + - max_rss_usage < 64.50 MB # flasksqli - name: flasksqli-appsec-enabled @@ -144,249 +144,249 @@ experiments: - name: flasksqli-iast-enabled thresholds: - execution_time < 2.80 ms - - max_rss_usage < 58.00 MB + - max_rss_usage < 59.00 MB - name: flasksqli-tracer-enabled thresholds: - execution_time < 2.25 ms - - max_rss_usage < 53.00 MB + - max_rss_usage < 53.50 MB # httppropagationextract - name: httppropagationextract-all_styles_all_headers thresholds: - execution_time < 0.10 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-b3_headers thresholds: - execution_time < 0.02 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-b3_single_headers thresholds: - execution_time < 0.02 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-datadog_tracecontext_tracestate_not_propagated_on_trace_id_no_match thresholds: - execution_time < 0.08 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-datadog_tracecontext_tracestate_propagated_on_trace_id_match thresholds: - execution_time < 0.08 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-empty_headers thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-full_t_id_datadog_headers thresholds: - execution_time < 0.03 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-invalid_priority_header thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-invalid_span_id_header thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-invalid_tags_header thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-invalid_trace_id_header thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-large_header_no_matches thresholds: - execution_time < 0.03 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-large_valid_headers_all thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-medium_header_no_matches thresholds: - execution_time < 0.02 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-medium_valid_headers_all thresholds: - execution_time < 0.02 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-none_propagation_style thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-tracecontext_headers thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-valid_headers_all thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-valid_headers_basic thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_empty_headers thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_invalid_priority_header thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_invalid_span_id_header thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_invalid_tags_header thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_invalid_trace_id_header thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_large_header_no_matches thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_large_valid_headers_all thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_medium_header_no_matches thresholds: - execution_time < 0.02 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_medium_valid_headers_all thresholds: - execution_time < 0.02 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_valid_headers_all thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationextract-wsgi_valid_headers_basic thresholds: - execution_time < 0.01 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB # httppropagationinject - name: httppropagationinject-ids_only thresholds: - execution_time < 0.03 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationinject-with_all thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationinject-with_dd_origin thresholds: - execution_time < 0.03 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationinject-with_priority_and_origin thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationinject-with_sampling_priority thresholds: - execution_time < 0.03 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationinject-with_tags thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationinject-with_tags_invalid thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB - name: httppropagationinject-with_tags_max_size thresholds: - execution_time < 0.04 ms - - max_rss_usage < 33.00 MB + - max_rss_usage < 33.50 MB # iast_aspects - name: iast_aspects-re_expand_aspect thresholds: - execution_time < 0.04 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_expand_noaspect thresholds: - execution_time < 0.04 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_findall_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_findall_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_finditer_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_finditer_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_fullmatch_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_fullmatch_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_group_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_group_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_groups_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_groups_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_match_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_match_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_search_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_search_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_sub_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_sub_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_subn_aspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: iast_aspects-re_subn_noaspect thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB # iastaspects - name: iastaspects-add_aspect @@ -820,11 +820,11 @@ experiments: - name: packagespackageforrootmodulemapping-cache_off thresholds: - execution_time < 354.30 ms - - max_rss_usage < 38.00 MB + - max_rss_usage < 40.00 MB - name: packagespackageforrootmodulemapping-cache_on thresholds: - execution_time < 0.01 ms - - max_rss_usage < 38.00 MB + - max_rss_usage < 39.00 MB # packagesupdateimporteddependencies - name: packagesupdateimporteddependencies-import_many @@ -854,7 +854,7 @@ experiments: - name: packagesupdateimporteddependencies-import_one thresholds: - execution_time < 0.03 ms - - max_rss_usage < 38.50 MB + - max_rss_usage < 39.00 MB - name: packagesupdateimporteddependencies-import_one_cache thresholds: - execution_time < 0.01 ms @@ -906,7 +906,7 @@ experiments: - name: recursivecomputation-deep thresholds: - execution_time < 320.95 ms - - max_rss_usage < 34.00 MB + - max_rss_usage < 34.50 MB - name: recursivecomputation-deep-profiled thresholds: - execution_time < 359.15 ms @@ -974,15 +974,15 @@ experiments: - name: sethttpmeta-obfuscation-send-querystring-disabled thresholds: - execution_time < 0.17 ms - - max_rss_usage < 34.00 MB + - max_rss_usage < 34.50 MB - name: sethttpmeta-obfuscation-worst-case-explicit-query thresholds: - execution_time < 0.16 ms - - max_rss_usage < 34.00 MB + - max_rss_usage < 34.50 MB - name: sethttpmeta-obfuscation-worst-case-implicit-query thresholds: - execution_time < 0.17 ms - - max_rss_usage < 34.00 MB + - max_rss_usage < 34.50 MB - name: sethttpmeta-useragentvariant_exists_1 thresholds: - execution_time < 0.05 ms @@ -1118,13 +1118,13 @@ experiments: - name: telemetryaddmetric-flush-1000-metrics thresholds: - execution_time < 2.35 ms - - max_rss_usage < 34.00 MB + - max_rss_usage < 34.50 MB # tracer - name: tracer-large thresholds: - execution_time < 32.95 ms - - max_rss_usage < 34.00 MB + - max_rss_usage < 34.50 MB - name: tracer-medium thresholds: - execution_time < 3.20 ms diff --git a/.riot/requirements/1550212.txt b/.riot/requirements/1550212.txt new file mode 100644 index 00000000000..bd0b8d1ae25 --- /dev/null +++ b/.riot/requirements/1550212.txt @@ -0,0 +1,21 @@ +# +# This file is autogenerated by pip-compile with Python 3.13 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/1550212.in +# +attrs==25.3.0 +coverage[toml]==7.10.1 +hypothesis==6.45.0 +iniconfig==2.1.0 +mock==5.2.0 +msgpack==1.1.1 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.1 +pytest-cov==6.2.1 +pytest-mock==3.14.1 +pytest-randomly==3.16.0 +sortedcontainers==2.4.0 diff --git a/ddtrace/_trace/processor/__init__.py b/ddtrace/_trace/processor/__init__.py index a7b40a2f477..783ed79601e 100644 --- a/ddtrace/_trace/processor/__init__.py +++ b/ddtrace/_trace/processor/__init__.py @@ -160,8 +160,8 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: if root_ctx and root_ctx.sampling_priority is None: self.sampler.sample(trace[0]._local_root) # When stats computation is enabled in the tracer then we can - # safely drop the traces. - if self._compute_stats_enabled and not self.apm_opt_out: + # safely drop the traces. When using the NativeWriter this is handled by native code. + if not config._trace_writer_native and self._compute_stats_enabled and not self.apm_opt_out: priority = root_ctx.sampling_priority if root_ctx is not None else None if priority is not None and priority <= 0: # When any span is marked as keep by a single span sampling diff --git a/ddtrace/_trace/tracer.py b/ddtrace/_trace/tracer.py index f0d186b9c27..826da02ef19 100644 --- a/ddtrace/_trace/tracer.py +++ b/ddtrace/_trace/tracer.py @@ -119,7 +119,8 @@ def _default_span_processors_factory( span_processors.append(AppSecIastSpanProcessor()) - if config._trace_compute_stats: + # When using the NativeWriter stats are computed by the native code. + if config._trace_compute_stats and not config._trace_writer_native: # Inline the import to avoid pulling in ddsketch or protobuf # when importing ddtrace. from ddtrace.internal.processor.stats import SpanStatsProcessorV06 @@ -264,6 +265,8 @@ def sample(self, span): self._sampler.sample(span) def _sample_before_fork(self) -> None: + if isinstance(self._span_aggregator.writer, AgentWriterInterface): + self._span_aggregator.writer.before_fork() span = self.current_root_span() if span is not None and span.context.sampling_priority is None: self.sample(span) diff --git a/ddtrace/internal/native/__init__.py b/ddtrace/internal/native/__init__.py index 06c5b956a19..73c967ce031 100644 --- a/ddtrace/internal/native/__init__.py +++ b/ddtrace/internal/native/__init__.py @@ -3,9 +3,18 @@ from typing import Optional from typing import Tuple +from ._native import AgentError # noqa: F401 +from ._native import BuilderError # noqa: F401 from ._native import DDSketch # noqa: F401 +from ._native import DeserializationError # noqa: F401 +from ._native import IoError # noqa: F401 +from ._native import NetworkError # noqa: F401 from ._native import PyConfigurator from ._native import PyTracerMetadata # noqa: F401 +from ._native import RequestError # noqa: F401 +from ._native import SerializationError # noqa: F401 +from ._native import TraceExporter # noqa: F401 +from ._native import TraceExporterBuilder # noqa: F401 from ._native import store_metadata # noqa: F401 diff --git a/ddtrace/internal/native/_native.pyi b/ddtrace/internal/native/_native.pyi index c301ec9dbb6..c388b3efbf5 100644 --- a/ddtrace/internal/native/_native.pyi +++ b/ddtrace/internal/native/_native.pyi @@ -139,3 +139,244 @@ def store_metadata(data: PyTracerMetadata) -> PyAnonymousFileHandle: :param data: The tracer configuration to store. """ ... + +class TraceExporter: + """ + TraceExporter is a class responsible for exporting traces to the Agent. + """ + + def __init__(self): + """ + Initialize a TraceExporter. + """ + ... + def send(self, data: bytes, trace_count: int) -> str: + """ + Send a trace payload to the Agent. + :param data: The msgpack encoded trace payload to send. + :param trace_count: The number of traces in the data payload. + """ + ... + def shutdown(self, timeout_ns: int) -> None: + """ + Shutdown the TraceExporter, releasing any resources and ensuring all pending stats are sent. + This method should be called before the application exits to ensure proper cleanup. + :param timeout_ns: The maximum time to wait for shutdown in nanoseconds. + """ + ... + def drop(self) -> None: + """ + Drop the TraceExporter, releasing any resources without sending pending stats. + """ + ... + def run_worker(self) -> None: + """ + Start the rust worker threads. + This starts the runtime required to process rust async tasks including stats and telemetry sending. + The runtime will also be created when calling `send`, + this method can be used to start the runtime before sending any traces. + """ + ... + def stop_worker(self) -> None: + """ + Stop the rust worker threads. + This stops the async runtime and must be called before forking to avoid deadlocks after forking. + This should be called even if `run_worker` hasn't been called as the runtime will be started + when calling `send`. + """ + ... + def debug(self) -> str: + """ + Returns a string representation of the exporter. + Should only be used for debugging. + """ + ... + +class TraceExporterBuilder: + """ + TraceExporterBuilder is a class responsible for building a TraceExporter. + """ + + def __init__(self): + """ + Initialize a TraceExporterBuilder. + """ + ... + def set_hostname(self, hostname: str) -> TraceExporterBuilder: + """ + Set the hostname of the TraceExporter. + :param hostname: The hostname to set for the TraceExporter. + """ + ... + def set_url(self, url: str) -> TraceExporterBuilder: + """ + Set the agent url of the TraceExporter. + :param url: The URL of the agent to send traces to. + """ + ... + def set_dogstatsd_url(self, url: str) -> TraceExporterBuilder: + """ + Set the DogStatsD URL of the TraceExporter. + :param url: The URL of the DogStatsD endpoint. + """ + ... + def set_env(self, env: str) -> TraceExporterBuilder: + """ + Set the env of the TraceExporter. + :param env: The environment name (e.g., 'prod', 'staging', 'dev'). + """ + ... + def set_app_version(self, version: str) -> TraceExporterBuilder: + """ + Set the app version of the TraceExporter. + :param version: The version string of the application. + """ + ... + def set_git_commit_sha(self, git_commit_sha: str) -> TraceExporterBuilder: + """ + Set the git commit sha of the TraceExporter. + :param git_commit_sha: The git commit SHA of the current code version. + """ + ... + def set_tracer_version(self, version: str) -> TraceExporterBuilder: + """ + Set the tracer version of the TraceExporter. + :param version: The version string of the tracer. + """ + ... + def set_language(self, language: str) -> TraceExporterBuilder: + """ + Set the language of the TraceExporter. + :param language: The programming language being traced (e.g., 'python'). + """ + ... + def set_language_version(self, version: str) -> TraceExporterBuilder: + """ + Set the language version of the TraceExporter. + :param version: The version string of the programming language. + """ + ... + def set_language_interpreter(self, interpreter: str) -> TraceExporterBuilder: + """ + Set the language interpreter of the TraceExporter. + :param vendor: The language interpreter. + """ + ... + def set_language_interpreter_vendor(self, vendor: str) -> TraceExporterBuilder: + """ + Set the language interpreter vendor of the TraceExporter. + :param vendor: The vendor of the language interpreter. + """ + ... + def set_test_session_token(self, token: str) -> TraceExporterBuilder: + """ + Set the test session token for the TraceExporter. + :param token: The test session token to use for authentication. + """ + ... + def set_input_format(self, input_format: str) -> TraceExporterBuilder: + """ + Set the input format for the trace data. + :param input_format: The format to use for input traces (supported values are "v0.4" and "v0.5"). + :raises ValueError: If input_format is not a supported value. + """ + ... + def set_output_format(self, output_format: str) -> TraceExporterBuilder: + """ + Set the output format for the trace data. + :param output_format: The format to use for output traces (supported values are "v0.4" and "v0.5"). + :raises ValueError: If output_format is not a supported value. + """ + ... + def set_client_computed_top_level(self) -> TraceExporterBuilder: + """ + Set the header indicating the tracer has computed the top-level tag + """ + ... + def set_client_computed_stats(self) -> TraceExporterBuilder: + """ + Set the header indicating the tracer has already computed stats. + This should not be used along with `enable_stats`. + The main use is to opt-out trace metrics. + """ + ... + def enable_stats(self, bucket_size_ns: int) -> TraceExporterBuilder: + """ + Enable stats computation in the TraceExporter + :param bucket_size_ns: The size of stats bucket in nanoseconds. + """ + ... + def enable_telemetry( + self, + heartbeat_ms: int, + runtime_id: str, + ) -> TraceExporterBuilder: + """ + Emit telemetry in the TraceExporter + :param heartbeat: The flush interval for telemetry metrics in milliseconds. + :param runtime_id: The runtime id to use for telemetry. + """ + ... + def build(self) -> TraceExporter: + """ + Build and return a TraceExporter instance with the configured settings. + This method consumes the builder, so it cannot be used again after calling build. + :return: A configured TraceExporter instance. + :raises ValueError: If the builder has already been consumed or if required settings are missing. + """ + ... + def debug(self) -> str: + """ + Returns a string representation of the exporter. + Should only be used for debugging. + """ + ... + +class AgentError(Exception): + """ + Raised when there is an error in agent response processing. + """ + + ... + +class BuilderError(Exception): + """ + Raised when there is an error in the TraceExporterBuilder configuration. + """ + + ... + +class DeserializationError(Exception): + """ + Raised when there is an error deserializing trace payload. + """ + + ... + +class IoError(Exception): + """ + Raised when there is an I/O error during trace processing. + """ + + ... + +class NetworkError(Exception): + """ + Raised when there is a network-related error during trace processing. + """ + + ... + +class RequestError(Exception): + """ + Raised when the agent responds with an error code. + """ + + ... + +class SerializationError(Exception): + """ + Raised when there is an error serializing trace payload. + """ + + ... diff --git a/ddtrace/internal/writer/__init__.py b/ddtrace/internal/writer/__init__.py index 2a7312d0c29..4a6409d7ee3 100644 --- a/ddtrace/internal/writer/__init__.py +++ b/ddtrace/internal/writer/__init__.py @@ -4,6 +4,7 @@ from .writer import AgentWriterInterface from .writer import HTTPWriter from .writer import LogWriter +from .writer import NativeWriter from .writer import Response from .writer import TraceWriter from .writer import _human_size @@ -18,6 +19,7 @@ "DEFAULT_SMA_WINDOW", "HTTPWriter", "LogWriter", + "NativeWriter", "Response", "TraceWriter", "WriterClientBase", diff --git a/ddtrace/internal/writer/writer.py b/ddtrace/internal/writer/writer.py index 5d00ff45ec9..41d3848c816 100644 --- a/ddtrace/internal/writer/writer.py +++ b/ddtrace/internal/writer/writer.py @@ -15,6 +15,7 @@ import ddtrace from ddtrace import config +import ddtrace.internal.native as native import ddtrace.internal.utils.http from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter from ddtrace.settings._agent import config as agent_config @@ -30,6 +31,7 @@ from ..constants import _HTTPLIB_NO_TRACE_REQUEST from ..dogstatsd import get_dogstatsd_client from ..encoding import JSONEncoderV2 +from ..gitmetadata import get_git_tags from ..logger import get_logger from ..serverless import has_aws_lambda_agent_extension from ..serverless import in_aws_lambda @@ -683,6 +685,371 @@ def set_test_session_token(self, token: Optional[str]) -> None: self._headers["X-Datadog-Test-Session-Token"] = token or "" +class NativeWriter(periodic.PeriodicService, TraceWriter, AgentWriterInterface): + """Writer using a native trace exporter to send traces to an agent.""" + + STATSD_NAMESPACE = "tracer" + + def __init__( + self, + intake_url: str, + processing_interval: Optional[float] = None, + compute_stats_enabled: bool = False, + # Match the payload size since there is no functionality + # to flush dynamically. + buffer_size: Optional[int] = None, + max_payload_size: Optional[int] = None, + dogstatsd: Optional["DogStatsd"] = None, + sync_mode: bool = False, + api_version: Optional[str] = None, + report_metrics: bool = True, + response_callback: Optional[Callable[[AgentResponse], None]] = None, + test_session_token: Optional[str] = None, + # Mark stats as computed, without computing them, skipping trace exporter stats computation. + # This setting overrides the `compute_stats_enabled` parameter. + stats_opt_out: Optional[bool] = False, + ) -> None: + if processing_interval is None: + processing_interval = config._trace_writer_interval_seconds + if buffer_size is not None and buffer_size <= 0: + raise ValueError("Writer buffer size must be positive") + if max_payload_size is not None and max_payload_size <= 0: + raise ValueError("Max payload size must be positive") + + # Default to v0.4 if we are on Windows since there is a known compatibility issue + # https://github.com/DataDog/dd-trace-py/issues/4829 + # DEV: sys.platform on windows should be `win32` or `cygwin`, but using `startswith` + # as a safety precaution. + # https://docs.python.org/3/library/sys.html#sys.platform + is_windows = sys.platform.startswith("win") or sys.platform.startswith("cygwin") + + default_api_version = "v0.5" + if ( + is_windows + or in_gcp_function() + or in_azure_function() + or asm_config._asm_enabled + or asm_config._iast_enabled + ): + default_api_version = "v0.4" + + self._api_version = api_version or config._trace_api or default_api_version + + if agent_config.trace_native_span_events: + log.warning("Setting api version to v0.4; DD_TRACE_NATIVE_SPAN_EVENTS is not compatible with v0.5") + self._api_version = "v0.4" + + if is_windows and self._api_version == "v0.5": + raise RuntimeError( + "There is a known compatibility issue with v0.5 API and Windows, " + "please see https://github.com/DataDog/dd-trace-py/issues/4829 for more details." + ) + + buffer_size = buffer_size or config._trace_writer_buffer_size + max_payload_size = max_payload_size or config._trace_writer_payload_size + if self._api_version not in WRITER_CLIENTS: + log.warning( + "Unsupported api version: '%s'. The supported versions are: %r", + self._api_version, + ", ".join(sorted(WRITER_CLIENTS.keys())), + ) + self._api_version = sorted(WRITER_CLIENTS.keys())[-1] + client = WRITER_CLIENTS[self._api_version](buffer_size, max_payload_size) + + additional_header_str = os.environ.get("_DD_TRACE_WRITER_ADDITIONAL_HEADERS") + if test_session_token is None and additional_header_str is not None: + additional_header = parse_tags_str(additional_header_str) + if "X-Datadog-Test-Session-Token" in additional_header: + test_session_token = additional_header["X-Datadog-Test-Session-Token"] + + super(NativeWriter, self).__init__(interval=processing_interval) + self.intake_url = intake_url + self._buffer_size = buffer_size + self._max_payload_size = max_payload_size + self._test_session_token = test_session_token + + self._clients = [client] + self.dogstatsd = dogstatsd + self._metrics: Dict[str, int] = defaultdict(int) + self._report_metrics = report_metrics + self._drop_sma = SimpleMovingAverage(DEFAULT_SMA_WINDOW) + self._sync_mode = sync_mode + self._compute_stats_enabled = compute_stats_enabled + self._response_cb = response_callback + self._stats_opt_out = stats_opt_out + self._exporter = self._create_exporter() + + def _create_exporter(self) -> native.TraceExporter: + """ + Create a new TraceExporter with the current configuration. + :return: A configured TraceExporter instance. + """ + _, commit_sha, _ = get_git_tags() + + builder = ( + native.TraceExporterBuilder() + .set_url(self.intake_url) + .set_language("python") + .set_language_version(compat.PYTHON_VERSION) + .set_language_interpreter(compat.PYTHON_INTERPRETER) + .set_tracer_version(ddtrace.__version__) + .set_git_commit_sha(commit_sha) + .set_client_computed_top_level() + .set_input_format(self._api_version) + .set_output_format(self._api_version) + ) + if self._test_session_token is not None: + builder.set_test_session_token(self._test_session_token) + if self._stats_opt_out: + builder.set_client_computed_stats() + elif self._compute_stats_enabled: + stats_interval = float(os.getenv("_DD_TRACE_STATS_WRITER_INTERVAL") or 10.0) + bucket_size_ns: int = int(stats_interval * 1e9) + builder.enable_stats(bucket_size_ns) + + return builder.build() + + def set_test_session_token(self, token: Optional[str]) -> None: + """ + Set the test session token and recreate the exporter with the new configuration. + :param token: The test session token to use for authentication. + """ + self._test_session_token = token + self._exporter.stop_worker() + self._exporter = self._create_exporter() + + def recreate(self, appsec_enabled: Optional[bool] = None) -> "NativeWriter": + # Ensure AppSec metadata is encoded by setting the API version to v0.4. + try: + # Stop the writer to ensure it is not running while we reconfigure it. + self.stop() + except ServiceStatusError: + # Writers like AgentWriter may not start until the first trace is encoded. + # Stopping them before that will raise a ServiceStatusError. + pass + + # Stop the trace exporter worker + self._exporter.stop_worker() + + api_version = "v0.4" if appsec_enabled else self._api_version + return self.__class__( + intake_url=self.intake_url, + processing_interval=self._interval, + compute_stats_enabled=self._compute_stats_enabled, + buffer_size=self._buffer_size, + max_payload_size=self._max_payload_size, + dogstatsd=self.dogstatsd, + sync_mode=self._sync_mode, + api_version=api_version, + report_metrics=self._report_metrics, + response_callback=self._response_cb, + test_session_token=self._test_session_token, + stats_opt_out=self._stats_opt_out, + ) + + def _downgrade(self, status, client): + if client.ENDPOINT == "v0.5/traces": + self._clients = [AgentWriterClientV4(self._buffer_size, self._max_payload_size)] + self._api_version = "v0.4" + self._exporter = self._create_exporter() + + # Since we have to change the encoding in this case, the payload + # would need to be converted to the downgraded encoding before + # sending it, but we chuck it away instead. + log.warning( + "Calling endpoint '%s' but received %s; downgrading API. " + "Dropping trace payload due to the downgrade to an incompatible API version (from v0.5 to v0.4). To " + "avoid this from happening in the future, either ensure that the Datadog agent has a v0.5/traces " + "endpoint available, or explicitly set the trace API version to, e.g., v0.4.", + client.ENDPOINT, + status, + ) + else: + log.error( + "unsupported endpoint '%s': received response %s from intake (%s)", + client.ENDPOINT, + status, + self.intake_url, + ) + + def _intake_endpoint(self, client=None): + return "{}/{}".format(self.intake_url, client.ENDPOINT if client else self._endpoint) + + @property + def _endpoint(self): + return self._clients[0].ENDPOINT + + @property + def _encoder(self): + return self._clients[0].encoder + + def _metrics_dist(self, name: str, count: int = 1, tags: Optional[List] = None) -> None: + if not self._report_metrics: + return + if config._health_metrics_enabled and self.dogstatsd: + self.dogstatsd.distribution("datadog.%s.%s" % (self.STATSD_NAMESPACE, name), count, tags=tags) + + def _set_drop_rate(self) -> None: + accepted = self._metrics["accepted_traces"] + sent = self._metrics["sent_traces"] + encoded = sum([len(client.encoder) for client in self._clients]) + # The number of dropped traces is the number of accepted traces minus the number of traces in the encoder + # This calculation is a best effort. Due to race conditions it may result in a slight underestimate. + dropped = max(accepted - sent - encoded, 0) # dropped spans should never be negative + self._drop_sma.set(dropped, accepted) + self._metrics["sent_traces"] = 0 # reset sent traces for the next interval + self._metrics["accepted_traces"] = encoded # sets accepted traces to number of spans in encoders + + def _set_keep_rate(self, trace): + if trace: + trace[0].set_metric(_KEEP_SPANS_RATE_KEY, 1.0 - self._drop_sma.get()) + + def _send_payload(self, payload: bytes, count: int, client: WriterClientBase): + try: + response_body = self._exporter.send(payload, count) + except native.RequestError as e: + try: + # Request errors are formatted as "Error code: {code}, Response: {response}" + code = int(str(e).split(",")[0].split(":", maxsplit=1)[1]) + except: # noqa:E722 if the error message is invalid we want to log the full error + raise e + if code == 404 or code == 415: + self._downgrade(code, client) + else: + raise e + finally: + self._metrics["sent_traces"] += count + + if self._response_cb: + response = Response(body=response_body) + raw_resp = response.get_json() + + if raw_resp and "rate_by_service" in raw_resp: + self._response_cb( + AgentResponse( + rate_by_service=raw_resp["rate_by_service"], + ) + ) + + def write(self, spans: Optional[List["Span"]] = None) -> None: + for client in self._clients: + self._write_with_client(client, spans=spans) + if self._sync_mode: + self.flush_queue() + + def _write_with_client(self, client: WriterClientBase, spans: Optional[List["Span"]] = None) -> None: + if spans is None: + return + + if self._sync_mode is False: + # Start the Writer on first write. + try: + if self.status != service.ServiceStatus.RUNNING: + self.start() + + except service.ServiceStatusError: + log.warning("failed to start writer service") + + self._metrics_dist("writer.accepted.traces") + self._metrics["accepted_traces"] += 1 + self._set_keep_rate(spans) + + try: + client.encoder.put(spans) + except BufferItemTooLarge as e: + payload_size = e.args[0] + log.warning( + "trace (%db) larger than payload buffer item limit (%db), dropping", + payload_size, + client.encoder.max_item_size, + ) + self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:t_too_big"]) + self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:t_too_big"]) + except BufferFull as e: + payload_size = e.args[0] + log.warning( + "trace buffer (%s traces %db/%db) cannot fit trace of size %db, dropping (writer status: %s)", + len(client.encoder), + client.encoder.size, + client.encoder.max_size, + payload_size, + self.status.value, + ) + self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:full"]) + self._metrics_dist("buffer.dropped.bytes", payload_size, tags=["reason:full"]) + except NoEncodableSpansError: + self._metrics_dist("buffer.dropped.traces", 1, tags=["reason:incompatible"]) + else: + self._metrics_dist("buffer.accepted.traces", 1) + self._metrics_dist("buffer.accepted.spans", len(spans)) + + def flush_queue(self, raise_exc: bool = False): + try: + for client in self._clients: + self._flush_queue_with_client(client, raise_exc=raise_exc) + finally: + self._set_drop_rate() + + def _flush_queue_with_client(self, client: WriterClientBase, raise_exc: bool = False) -> None: + n_traces = len(client.encoder) + try: + if not (encoded_traces := client.encoder.encode()): + return + except Exception: + # FIXME(munir): if client.encoder raises an Exception n_traces may not be accurate due to race conditions + log.error("failed to encode trace with encoder %r", client.encoder, exc_info=True) + self._metrics_dist("encoder.dropped.traces", n_traces) + return + + for payload in encoded_traces: + encoded_data, n_traces = payload + self._flush_single_payload(encoded_data, n_traces, client=client, raise_exc=raise_exc) + + def _flush_single_payload( + self, encoded: Optional[bytes], n_traces: int, client: WriterClientBase, raise_exc: bool = False + ) -> None: + if encoded is None: + return + try: + self._send_payload(encoded, n_traces, client) + except Exception as e: + if raise_exc: + raise + + msg = "failed to send, dropping %d traces to intake at %s: %s" + log_args = ( + n_traces, + self._intake_endpoint(client), + str(e), + ) + # Append the payload if requested + if config._trace_writer_log_err_payload: + msg += ", payload %s" + log_args += (binascii.hexlify(encoded).decode(),) # type: ignore + + log.error(msg, *log_args) + + def periodic(self): + self.flush_queue(raise_exc=False) + + def _stop_service( + self, + timeout: Optional[float] = None, + ) -> None: + # FIXME: don't join() on stop(), let the caller handle this + super(NativeWriter, self)._stop_service() + self.join(timeout=timeout) + + def before_fork(self) -> None: + self._exporter.stop_worker() + + def on_shutdown(self): + try: + self.periodic() + finally: + self._exporter.shutdown(3_000_000_000) # 3 seconds timeout + + def _use_log_writer() -> bool: """Returns whether the LogWriter should be used in the environment by default. @@ -727,15 +1094,26 @@ def create_trace_writer(response_callback: Optional[Callable[[AgentResponse], No verify_url(agent_config.trace_agent_url) - headers: Dict[str, str] = {} - if config._trace_compute_stats or asm_config._apm_opt_out: - headers["Datadog-Client-Computed-Stats"] = "yes" - - return AgentWriter( - intake_url=agent_config.trace_agent_url, - dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url), - sync_mode=_use_sync_mode(), - headers=headers, - report_metrics=not asm_config._apm_opt_out, - response_callback=response_callback, - ) + if config._trace_writer_native: + return NativeWriter( + intake_url=agent_config.trace_agent_url, + dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url), + sync_mode=_use_sync_mode(), + compute_stats_enabled=config._trace_compute_stats, + report_metrics=not asm_config._apm_opt_out, + response_callback=response_callback, + stats_opt_out=asm_config._apm_opt_out, + ) + else: + headers: Dict[str, str] = {} + if config._trace_compute_stats or asm_config._apm_opt_out: + headers["Datadog-Client-Computed-Stats"] = "yes" + + return AgentWriter( + intake_url=agent_config.trace_agent_url, + dogstatsd=get_dogstatsd_client(agent_config.dogstatsd_url), + sync_mode=_use_sync_mode(), + headers=headers, + report_metrics=not asm_config._apm_opt_out, + response_callback=response_callback, + ) diff --git a/ddtrace/settings/_config.py b/ddtrace/settings/_config.py index 64417322ec3..71c2f59bce2 100644 --- a/ddtrace/settings/_config.py +++ b/ddtrace/settings/_config.py @@ -491,6 +491,9 @@ def __init__(self): ) self._trace_writer_log_err_payload = _get_config("_DD_TRACE_WRITER_LOG_ERROR_PAYLOADS", False, asbool) + # Use the NativeWriter instead of the AgentWriter + self._trace_writer_native = _get_config("_DD_TRACE_WRITER_NATIVE", False, asbool) + # TODO: Remove the configurations below. ddtrace.internal.agent.config should be used instead. self._trace_agent_url = _get_config("DD_TRACE_AGENT_URL") self._agent_timeout_seconds = _get_config("DD_TRACE_AGENT_TIMEOUT_SECONDS", DEFAULT_TIMEOUT, float) diff --git a/lib-injection/sources/sitecustomize.py b/lib-injection/sources/sitecustomize.py index 27b43afd802..42600143ec3 100644 --- a/lib-injection/sources/sitecustomize.py +++ b/lib-injection/sources/sitecustomize.py @@ -185,6 +185,9 @@ def _send_telemetry(event): stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, universal_newlines=True, + env={ + "DD_TRACE_ENABLED": "false", + }, ) # Mimic Popen.__exit__ which was added in Python 3.3 try: diff --git a/riotfile.py b/riotfile.py index 04a98045727..53cab41cd4e 100644 --- a/riotfile.py +++ b/riotfile.py @@ -421,6 +421,17 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT "AGENT_VERSION": "testagent", }, ), + # This test variant ensures integration snapshots tests are compatible with both AgentWriter + # and NativeWriter. + Venv( + name="integration-snapshot-native-writer", + env={ + "DD_TRACE_AGENT_URL": "http://localhost:9126", + "AGENT_VERSION": "testagent", + "_DD_TRACE_WRITER_NATIVE": "1", + }, + pys=MAX_PYTHON_VERSION, + ), ], ), Venv( diff --git a/src/native/Cargo.lock b/src/native/Cargo.lock index 6e9a9ac2d17..82a31c3b98a 100644 --- a/src/native/Cargo.lock +++ b/src/native/Cargo.lock @@ -8,7 +8,7 @@ version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" dependencies = [ - "gimli", + "gimli 0.31.1", ] [[package]] @@ -103,6 +103,12 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "autocfg" version = "1.5.0" @@ -190,22 +196,31 @@ checksum = "a1d084b0137aaa901caf9f1e8b21daa6aa24d41cd806e111335541eff9683bd6" [[package]] name = "blazesym" -version = "0.2.0-rc.3" +version = "0.2.0-rc.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d5caf72227115ce486e764e9199d6eab3437c51dacd78c3fae78ca1340c27d" +checksum = "29a810b7e5f883ad3c711208237841f051061bf59b6ee698ac4dc1fe12a3a5db" dependencies = [ "cpp_demangle", - "gimli", + "gimli 0.32.0", "libc", "memmap2", "miniz_oxide", "rustc-demangle", ] +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "build_common" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "cbindgen", "serde", @@ -230,6 +245,15 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "cadence" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3075f133bee430b7644c54fb629b9b4420346ffa275a45c81a6babe8b09b4f51" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "cbindgen" version = "0.29.0" @@ -410,6 +434,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -425,10 +467,50 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-pipeline" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" +dependencies = [ + "anyhow", + "arc-swap", + "bytes", + "datadog-ddsketch", + "datadog-trace-protobuf", + "datadog-trace-utils", + "ddcommon", + "ddtelemetry", + "dogstatsd-client", + "either", + "http", + "http-body-util", + "hyper", + "hyper-util", + "rmp-serde", + "serde", + "serde_json", + "sha2", + "tinybytes", + "tokio", + "tokio-util", + "tracing", + "uuid", +] + [[package]] name = "datadog-alloc" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "allocator-api2", "libc", @@ -438,7 +520,7 @@ dependencies = [ [[package]] name = "datadog-crashtracker" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "anyhow", "backtrace", @@ -470,7 +552,7 @@ dependencies = [ [[package]] name = "datadog-ddsketch" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "prost", ] @@ -478,7 +560,7 @@ dependencies = [ [[package]] name = "datadog-library-config" version = "0.0.2" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "anyhow", "memfd", @@ -492,7 +574,7 @@ dependencies = [ [[package]] name = "datadog-profiling" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "anyhow", "bitmaps", @@ -522,7 +604,7 @@ dependencies = [ [[package]] name = "datadog-profiling-ffi" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "anyhow", "build_common", @@ -541,15 +623,59 @@ dependencies = [ [[package]] name = "datadog-profiling-protobuf" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "prost", ] +[[package]] +name = "datadog-trace-normalization" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" +dependencies = [ + "anyhow", + "datadog-trace-protobuf", +] + +[[package]] +name = "datadog-trace-protobuf" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" +dependencies = [ + "prost", + "serde", + "serde_bytes", +] + +[[package]] +name = "datadog-trace-utils" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" +dependencies = [ + "anyhow", + "bytes", + "datadog-trace-normalization", + "datadog-trace-protobuf", + "ddcommon", + "futures", + "http-body-util", + "hyper", + "prost", + "rand", + "rmp", + "rmp-serde", + "rmpv", + "serde", + "serde_json", + "tinybytes", + "tokio", + "tracing", +] + [[package]] name = "ddcommon" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "anyhow", "cc", @@ -582,7 +708,7 @@ dependencies = [ [[package]] name = "ddcommon-ffi" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "anyhow", "build_common", @@ -596,7 +722,7 @@ dependencies = [ [[package]] name = "ddtelemetry" version = "19.1.0" -source = "git+https://github.com/DataDog/libdatadog?rev=v19.1.0#81b0e5a640cde084836c374e73b2e176498cbe06" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" dependencies = [ "anyhow", "base64", @@ -623,6 +749,7 @@ version = "0.1.0" dependencies = [ "anyhow", "build_common", + "data-pipeline", "datadog-crashtracker", "datadog-ddsketch", "datadog-library-config", @@ -650,6 +777,29 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + +[[package]] +name = "dogstatsd-client" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" +dependencies = [ + "anyhow", + "cadence", + "ddcommon", + "http", + "serde", + "tracing", +] + [[package]] name = "dunce" version = "1.0.5" @@ -818,6 +968,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -846,6 +1006,12 @@ name = "gimli" version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "gimli" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93563d740bc9ef04104f9ed6f86f1e3275c2cdafb95664e26584b9ca807a8ffe" dependencies = [ "fallible-iterator", "indexmap", @@ -1522,9 +1688,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.38.0" +version = "0.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8927b0664f5c5a98265138b7e3f90aa19a6b21353182469ace36d4ac527b7b1b" +checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4" dependencies = [ "memchr", ] @@ -1639,6 +1805,16 @@ dependencies = [ "serde", ] +[[package]] +name = "rmpv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58450723cd9ee93273ce44a20b6ec4efe17f8ed2e3631474387bfdecf18bb2a9" +dependencies = [ + "num-traits", + "rmp", +] + [[package]] name = "rustc-demangle" version = "0.1.26" @@ -1802,6 +1978,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.219" @@ -1858,6 +2043,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2037,6 +2233,14 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinybytes" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=72d13e3#72d13e3c636b01b70b6b09b184c4972f117d38c2" +dependencies = [ + "serde", +] + [[package]] name = "tokio" version = "1.47.1" @@ -2078,9 +2282,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", @@ -2171,6 +2375,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + [[package]] name = "unicase" version = "2.8.1" @@ -2225,6 +2435,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "want" version = "0.3.1" diff --git a/src/native/Cargo.toml b/src/native/Cargo.toml index a16d6b70729..b9f9ba75ee0 100644 --- a/src/native/Cargo.toml +++ b/src/native/Cargo.toml @@ -16,18 +16,19 @@ profiling = ["dep:datadog-profiling-ffi"] [dependencies] anyhow = { version = "1.0", optional = true } -datadog-crashtracker = { git = "https://github.com/DataDog/libdatadog", rev = "v19.1.0", optional = true } -datadog-ddsketch = { git = "https://github.com/DataDog/libdatadog", rev = "v19.1.0" } -datadog-library-config = { git = "https://github.com/DataDog/libdatadog", rev = "v19.1.0" } -datadog-profiling-ffi = { git = "https://github.com/DataDog/libdatadog", rev = "v19.1.0", optional = true, features = [ +datadog-crashtracker = { git = "https://github.com/DataDog/libdatadog", rev = "72d13e3", optional = true } +datadog-ddsketch = { git = "https://github.com/DataDog/libdatadog", rev = "72d13e3" } +datadog-library-config = { git = "https://github.com/DataDog/libdatadog", rev = "72d13e3" } +data-pipeline = { git = "https://github.com/DataDog/libdatadog", rev = "72d13e3" } +datadog-profiling-ffi = { git = "https://github.com/DataDog/libdatadog", rev = "72d13e3", optional = true, features = [ "cbindgen", ] } -ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "v19.1.0"} +ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "72d13e3" } pyo3 = { version = "0.24", features = ["extension-module", "anyhow"] } [build-dependencies] pyo3-build-config = "0.24" -build_common = { git = "https://github.com/DataDog/libdatadog", rev = "v19.1.0", features = [ +build_common = { git = "https://github.com/DataDog/libdatadog", rev = "72d13e3", features = [ "cbindgen", ] } diff --git a/src/native/data_pipeline/exceptions.rs b/src/native/data_pipeline/exceptions.rs new file mode 100644 index 00000000000..adb166b0164 --- /dev/null +++ b/src/native/data_pipeline/exceptions.rs @@ -0,0 +1,105 @@ +use data_pipeline::trace_exporter::error::TraceExporterError; +use pyo3::{create_exception, exceptions::PyException, prelude::*, PyErr}; + +create_exception!( + trace_exporter_exceptions, + AgentError, + PyException, + "Agent error" +); +create_exception!( + trace_exporter_exceptions, + BuilderError, + PyException, + "Builder error" +); +create_exception!( + trace_exporter_exceptions, + InternalError, + PyException, + "Internal error" +); +create_exception!( + trace_exporter_exceptions, + DeserializationError, + PyException, + "Deserialization error" +); +create_exception!(trace_exporter_exceptions, IoError, PyException, "Io error"); +create_exception!( + trace_exporter_exceptions, + NetworkError, + PyException, + "Network error" +); +create_exception!( + trace_exporter_exceptions, + TimeoutError, + PyException, + "Timeout error" +); +create_exception!( + trace_exporter_exceptions, + RequestError, + PyException, + "Request error" +); +create_exception!( + trace_exporter_exceptions, + SerializationError, + PyException, + "Serialization error" +); + +pub struct TraceExporterErrorPy(pub TraceExporterError); + +impl From for PyErr { + fn from(value: TraceExporterErrorPy) -> Self { + match value.0 { + TraceExporterError::Agent(error) => AgentError::new_err(error.to_string()), + TraceExporterError::Builder(error) => BuilderError::new_err(error.to_string()), + TraceExporterError::Internal(error) => InternalError::new_err(error.to_string()), + TraceExporterError::Deserialization(error) => { + DeserializationError::new_err(error.to_string()) + } + TraceExporterError::Io(error) => IoError::new_err(error.to_string()), + TraceExporterError::Network(error) => NetworkError::new_err(error.to_string()), + // PyO3 doesn't properly support adding extra fields to an exception type + // see https://github.com/PyO3/pyo3/issues/295#issuecomment-2387253743. + // We manually create the error message here to make sure it can be parsed in the + // NativeWriter to access the error code. + TraceExporterError::Request(error) => RequestError::new_err(format!( + "Error code: {}, Response: {}", + error.status(), + error.msg() + )), + TraceExporterError::Serialization(error) => { + SerializationError::new_err(error.to_string()) + } + } + } +} + +impl From for TraceExporterErrorPy { + fn from(value: TraceExporterError) -> Self { + Self(value) + } +} + +pub fn register_exceptions(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add("AgentError", m.py().get_type::())?; + m.add("BuilderError", m.py().get_type::())?; + m.add("InternalError", m.py().get_type::())?; + m.add( + "DeserializationError", + m.py().get_type::(), + )?; + m.add("IoError", m.py().get_type::())?; + m.add("NetworkError", m.py().get_type::())?; + m.add("RequestError", m.py().get_type::())?; + m.add( + "SerializationError", + m.py().get_type::(), + )?; + Ok(()) +} diff --git a/src/native/data_pipeline/mod.rs b/src/native/data_pipeline/mod.rs new file mode 100644 index 00000000000..2c5f47ee29a --- /dev/null +++ b/src/native/data_pipeline/mod.rs @@ -0,0 +1,259 @@ +use data_pipeline::trace_exporter::{ + agent_response::AgentResponse, TelemetryConfig, TraceExporter, TraceExporterBuilder, + TraceExporterInputFormat, TraceExporterOutputFormat, +}; +use pyo3::{exceptions::PyValueError, prelude::*, pybacked::PyBackedBytes}; +use std::time::Duration; +mod exceptions; +use exceptions::TraceExporterErrorPy; + +/// A wrapper around [TraceExporterBuilder] +/// +/// Allows to use the builder as a python class. Only one exporter can be built using a builder +/// once `build` has been called the builder shouldn't be reused. +#[pyclass(name = "TraceExporterBuilder")] +pub struct TraceExporterBuilderPy { + builder: Option, +} + +impl TraceExporterBuilderPy { + fn try_as_mut(&mut self) -> PyResult<&mut TraceExporterBuilder> { + self.builder + .as_mut() + .ok_or(PyValueError::new_err("Builder has already been consumed")) + } +} + +#[pymethods] +impl TraceExporterBuilderPy { + #[new] + fn new() -> Self { + TraceExporterBuilderPy { + builder: Some(TraceExporterBuilder::default()), + } + } + + fn set_hostname(mut slf: PyRefMut<'_, Self>, hostname: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_hostname(hostname); + Ok(slf.into()) + } + + fn set_url(mut slf: PyRefMut<'_, Self>, url: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_url(url); + Ok(slf.into()) + } + + fn set_dogstatsd_url(mut slf: PyRefMut<'_, Self>, url: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_dogstatsd_url(url); + Ok(slf.into()) + } + + fn set_env(mut slf: PyRefMut<'_, Self>, env: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_env(env); + Ok(slf.into()) + } + + fn set_app_version(mut slf: PyRefMut<'_, Self>, version: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_app_version(version); + Ok(slf.into()) + } + + fn set_service(mut slf: PyRefMut<'_, Self>, service: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_service(service); + Ok(slf.into()) + } + + fn set_git_commit_sha( + mut slf: PyRefMut<'_, Self>, + git_commit_sha: &'_ str, + ) -> PyResult> { + slf.try_as_mut()?.set_git_commit_sha(git_commit_sha); + Ok(slf.into()) + } + + fn set_tracer_version(mut slf: PyRefMut<'_, Self>, version: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_tracer_version(version); + Ok(slf.into()) + } + + fn set_language(mut slf: PyRefMut<'_, Self>, language: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_language(language); + Ok(slf.into()) + } + + fn set_language_version(mut slf: PyRefMut<'_, Self>, version: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_language_version(version); + Ok(slf.into()) + } + + fn set_language_interpreter( + mut slf: PyRefMut<'_, Self>, + interpreter: &'_ str, + ) -> PyResult> { + slf.try_as_mut()?.set_language_interpreter(interpreter); + Ok(slf.into()) + } + + fn set_language_interpreter_vendor( + mut slf: PyRefMut<'_, Self>, + vendor: &'_ str, + ) -> PyResult> { + slf.try_as_mut()?.set_language_interpreter_vendor(vendor); + Ok(slf.into()) + } + + fn set_test_session_token(mut slf: PyRefMut<'_, Self>, token: &'_ str) -> PyResult> { + slf.try_as_mut()?.set_test_session_token(token); + Ok(slf.into()) + } + + fn set_input_format(mut slf: PyRefMut<'_, Self>, input_format: &str) -> PyResult> { + let input_format = match input_format { + "v0.4" => Ok(TraceExporterInputFormat::V04), + "v0.5" => Ok(TraceExporterInputFormat::V05), + _ => Err(PyValueError::new_err("Invalid trace format")), + }?; + slf.try_as_mut()?.set_input_format(input_format); + Ok(slf.into()) + } + + fn set_output_format(mut slf: PyRefMut<'_, Self>, output_format: &str) -> PyResult> { + let output_format = match output_format { + "v0.4" => Ok(TraceExporterOutputFormat::V04), + "v0.5" => Ok(TraceExporterOutputFormat::V05), + _ => Err(PyValueError::new_err("Invalid trace format")), + }?; + slf.try_as_mut()?.set_output_format(output_format); + Ok(slf.into()) + } + + fn set_client_computed_top_level(mut slf: PyRefMut<'_, Self>) -> PyResult> { + slf.try_as_mut()?.set_client_computed_top_level(); + Ok(slf.into()) + } + + fn set_client_computed_stats(mut slf: PyRefMut<'_, Self>) -> PyResult> { + slf.try_as_mut()?.set_client_computed_stats(); + Ok(slf.into()) + } + + fn enable_stats(mut slf: PyRefMut<'_, Self>, bucket_size_ns: u64) -> PyResult> { + slf.try_as_mut()? + .enable_stats(Duration::from_nanos(bucket_size_ns)); + Ok(slf.into()) + } + + fn enable_telemetry( + mut slf: PyRefMut<'_, Self>, + heartbeat_ms: u64, + runtime_id: String, + ) -> PyResult> { + slf.try_as_mut()?.enable_telemetry(Some(TelemetryConfig { + heartbeat: heartbeat_ms, + runtime_id: Some(runtime_id), + debug_enabled: true, + })); + Ok(slf.into()) + } + + /// Consumes the wrapped builder. + /// + /// The builder shouldn't be reused + fn build(&mut self) -> PyResult { + let exporter = TraceExporterPy { + inner: Some( + self.builder + .take() + .ok_or(PyValueError::new_err("Builder has already been consumed"))? + .build() + .map_err(|err| PyValueError::new_err(format!("Builder {err}")))?, + ), + }; + Ok(exporter) + } + + fn debug(&self) -> String { + format!("{:?}", self.builder) + } +} + +/// A python object wrapping a [TraceExporter] instance +#[pyclass(name = "TraceExporter")] +pub struct TraceExporterPy { + inner: Option, +} + +#[pymethods] +impl TraceExporterPy { + /// Send a msgpack encoded trace payload. + /// + /// The payload is passed as an immutable `bytes` object to be able to release the GIL while + /// sending the traces. + fn send(&self, py: Python<'_>, data: PyBackedBytes, trace_count: usize) -> PyResult { + py.allow_threads(move || { + match self + .inner + .as_ref() + .ok_or(PyValueError::new_err( + "TraceExporter has already been consumed", + ))? + .send(&data, trace_count) + { + Ok(res) => match res { + AgentResponse::Changed { body } => Ok(body), + AgentResponse::Unchanged => Ok("".to_string()), + }, + Err(e) => Err(TraceExporterErrorPy::from(e).into()), + } + }) + } + + fn shutdown(&mut self, timeout_ns: u64) -> PyResult<()> { + match self + .inner + .take() + .ok_or(PyValueError::new_err( + "TraceExporter has already been consumed", + ))? + .shutdown(Some(Duration::from_nanos(timeout_ns))) + { + Ok(_) => Ok(()), + Err(e) => Err(TraceExporterErrorPy::from(e).into()), + } + } + + fn drop(&mut self) -> PyResult<()> { + drop(self.inner.take()); + Ok(()) + } + + fn run_worker(&self) -> PyResult<()> { + let exporter = self.inner.as_ref().ok_or(PyValueError::new_err( + "TraceExporter has already been consumed", + ))?; + match exporter.run_worker() { + Ok(_) => Ok(()), + Err(e) => Err(TraceExporterErrorPy::from(e).into()), + } + } + + fn stop_worker(&self) -> PyResult<()> { + if let Some(exporter) = self.inner.as_ref() { + exporter.stop_worker(); + } + Ok(()) + } + + fn debug(&self) -> String { + format!("{:?}", self.inner) + } +} + +#[pymodule] +pub fn register_data_pipeline(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + exceptions::register_exceptions(m)?; + + Ok(()) +} diff --git a/src/native/lib.rs b/src/native/lib.rs index c645409d545..a84ebf2c46a 100644 --- a/src/native/lib.rs +++ b/src/native/lib.rs @@ -2,6 +2,7 @@ mod crashtracker; #[cfg(feature = "profiling")] pub use datadog_profiling_ffi::*; +mod data_pipeline; mod ddsketch; mod library_config; @@ -32,5 +33,6 @@ fn _native(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_wrapped(wrap_pyfunction!(library_config::store_metadata))?; + data_pipeline::register_data_pipeline(m)?; Ok(()) } diff --git a/tests/contrib/kafka/test_kafka.py b/tests/contrib/kafka/test_kafka.py index 1e62f7d779f..d8353e8fe7c 100644 --- a/tests/contrib/kafka/test_kafka.py +++ b/tests/contrib/kafka/test_kafka.py @@ -11,6 +11,7 @@ import mock import pytest +from ddtrace import config from ddtrace.contrib.internal.kafka.patch import TracedConsumer from ddtrace.contrib.internal.kafka.patch import TracedProducer from ddtrace.contrib.internal.kafka.patch import patch @@ -94,7 +95,8 @@ def dummy_tracer(): patch() t = DummyTracer() # disable backoff because it makes these tests less reliable - t._span_aggregator.writer._send_payload_with_backoff = t._span_aggregator.writer._send_payload + if not config._trace_writer_native: + t._span_aggregator.writer._send_payload_with_backoff = t._span_aggregator.writer._send_payload yield t unpatch() @@ -110,13 +112,15 @@ def tracer(should_filter_empty_polls): if should_filter_empty_polls: ddtracer.configure(trace_processors=[KafkaConsumerPollFilter()]) # disable backoff because it makes these tests less reliable - previous_backoff = ddtracer._span_aggregator.writer._send_payload_with_backoff - ddtracer._span_aggregator.writer._send_payload_with_backoff = ddtracer._span_aggregator.writer._send_payload + if not config._trace_writer_native: + previous_backoff = ddtracer._span_aggregator.writer._send_payload_with_backoff + ddtracer._span_aggregator.writer._send_payload_with_backoff = ddtracer._span_aggregator.writer._send_payload try: yield ddtracer finally: ddtracer.flush() - ddtracer._span_aggregator.writer._send_payload_with_backoff = previous_backoff + if not config._trace_writer_native: + ddtracer._span_aggregator.writer._send_payload_with_backoff = previous_backoff unpatch() @@ -552,9 +556,10 @@ def _generate_in_subprocess(random_topic): ddtrace.tracer.configure(trace_processors=[KafkaConsumerPollFilter()]) # disable backoff because it makes these tests less reliable - ddtrace.tracer._span_aggregator.writer._send_payload_with_backoff = ( - ddtrace.tracer._span_aggregator.writer._send_payload - ) + if not config._trace_writer_native: + ddtrace.tracer._span_aggregator.writer._send_payload_with_backoff = ( + ddtrace.tracer._span_aggregator.writer._send_payload + ) patch() producer = confluent_kafka.Producer({"bootstrap.servers": BOOTSTRAP_SERVERS}) diff --git a/tests/integration/test_debug.py b/tests/integration/test_debug.py index a7ff0baac2f..b38872b3212 100644 --- a/tests/integration/test_debug.py +++ b/tests/integration/test_debug.py @@ -107,7 +107,7 @@ def test_debug_post_configure_uds(): assert agent_url == "unix:///file.sock" agent_error = f.get("agent_error") - assert re.match("^Agent not reachable.*No such file or directory", agent_error) + assert re.match("^Agent not reachable", agent_error) class TestGlobalConfig(SubprocessTestCase): diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 3d74e83e1d3..95a39983c2c 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -9,6 +9,7 @@ from ddtrace.internal.atexit import register_on_exit_signal from tests.integration.utils import parametrize_with_all_encodings +from tests.integration.utils import skip_if_native_writer from tests.integration.utils import skip_if_testagent from tests.utils import DummyTracer from tests.utils import call_program @@ -82,20 +83,32 @@ def test_uds_wrong_socket_path(): import mock + from ddtrace import config from ddtrace.trace import tracer as t encoding = os.environ["DD_TRACE_API_VERSION"] with mock.patch("ddtrace.internal.writer.writer.log") as log: t.trace("client.testing").finish() t.shutdown() - calls = [ - mock.call( - "failed to send, dropping %d traces to intake at %s after %d retries", - 1, - "unix:///tmp/ddagent/nosockethere/{}/traces".format(encoding if encoding else "v0.5"), - 3, - ) - ] + + if config._trace_writer_native: + calls = [ + mock.call( + "failed to send, dropping %d traces to intake at %s: %s", + 1, + "unix:///tmp/ddagent/nosockethere/{}/traces".format(encoding if encoding else "v0.5"), + "client error (Connect)", + ) + ] + else: + calls = [ + mock.call( + "failed to send, dropping %d traces to intake at %s after %d retries", + 1, + "unix:///tmp/ddagent/nosockethere/{}/traces".format(encoding if encoding else "v0.5"), + 3, + ) + ] log.error.assert_has_calls(calls) @@ -180,6 +193,7 @@ def test_child_spans_do_not_cause_warning_logs(): def test_metrics(): import mock + from ddtrace import config from ddtrace.trace import tracer as t from tests.utils import AnyInt from tests.utils import override_global_config @@ -201,25 +215,31 @@ def test_metrics(): log.warning.assert_not_called() log.error.assert_not_called() - statsd_mock.distribution.assert_has_calls( - [ - mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None), - mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None), - mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None), - mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None), + calls = [ + mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None), + mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None), + mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None), + mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.spans", 300, tags=None), + ] + + if not config._trace_writer_native: + calls += [ mock.call("datadog.tracer.http.requests", 1, tags=None), mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None), mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None), mock.call("datadog.tracer.http.sent.traces", 4, tags=None), - ], + ] + + statsd_mock.distribution.assert_has_calls( + calls, any_order=True, ) @@ -230,6 +250,7 @@ def test_metrics(): def test_metrics_partial_flush_disabled(): import mock + from ddtrace import config from ddtrace.trace import tracer as t from tests.utils import AnyInt from tests.utils import override_global_config @@ -249,19 +270,25 @@ def test_metrics_partial_flush_disabled(): log.warning.assert_not_called() log.error.assert_not_called() - statsd_mock.distribution.assert_has_calls( - [ - mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.spans", 600, tags=None), - mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), - mock.call("datadog.tracer.buffer.accepted.spans", 600, tags=None), + calls = [ + mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.spans", 600, tags=None), + mock.call("datadog.tracer.writer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=None), + mock.call("datadog.tracer.buffer.accepted.spans", 600, tags=None), + ] + + if not config._trace_writer_native: + calls += [ mock.call("datadog.tracer.http.requests", 1, tags=None), mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None), mock.call("datadog.tracer.http.sent.bytes", AnyInt(), tags=None), mock.call("datadog.tracer.http.sent.traces", 2, tags=None), - ], + ] + + statsd_mock.distribution.assert_has_calls( + calls, any_order=True, ) @@ -327,6 +354,7 @@ def test_trace_generates_error_logs_when_trace_agent_url_invalid(): import mock + from ddtrace import config from ddtrace.trace import tracer as t with mock.patch("ddtrace.internal.writer.writer.log") as log: @@ -334,17 +362,29 @@ def test_trace_generates_error_logs_when_trace_agent_url_invalid(): t.shutdown() encoding = os.environ["DD_TRACE_API_VERSION"] - calls = [ - mock.call( - "failed to send, dropping %d traces to intake at %s after %d retries", - 1, - "http://localhost:8125/{}/traces".format(encoding if encoding else "v0.5"), - 3, - ) - ] + + if config._trace_writer_native: + calls = [ + mock.call( + "failed to send, dropping %d traces to intake at %s: %s", + 1, + "http://localhost:8125/{}/traces".format(encoding if encoding else "v0.5"), + "client error (Connect)", + ) + ] + else: + calls = [ + mock.call( + "failed to send, dropping %d traces to intake at %s after %d retries", + 1, + "http://localhost:8125/{}/traces".format(encoding if encoding else "v0.5"), + 3, + ) + ] log.error.assert_has_calls(calls) +@skip_if_native_writer @skip_if_testagent @parametrize_with_all_encodings(check_logs=False) def test_inode_entity_id_header_present(): @@ -380,6 +420,7 @@ def test_inode_entity_id_header_present(): t.shutdown() +@skip_if_native_writer @skip_if_testagent @parametrize_with_all_encodings(check_logs=False) def test_external_env_header_present(): @@ -399,6 +440,7 @@ def test_external_env_header_present(): assert headers["Datadog-External-Env"] == mocked_external_env +@skip_if_native_writer @skip_if_testagent @parametrize_with_all_encodings() def test_validate_headers_in_payload_to_intake_with_multiple_traces(): @@ -415,6 +457,7 @@ def test_validate_headers_in_payload_to_intake_with_multiple_traces(): assert headers.get("X-Datadog-Trace-Count") == "100" +@skip_if_native_writer @skip_if_testagent @parametrize_with_all_encodings() def test_validate_headers_in_payload_to_intake_with_nested_spans(): @@ -433,6 +476,7 @@ def test_validate_headers_in_payload_to_intake_with_nested_spans(): assert headers.get("X-Datadog-Trace-Count") == "10" +@skip_if_native_writer @parametrize_with_all_encodings() def test_trace_with_invalid_client_endpoint_generates_error_log(): import mock @@ -461,19 +505,32 @@ def test_trace_with_invalid_client_endpoint_generates_error_log(): def test_trace_with_invalid_payload_generates_error_log(): import mock + from ddtrace import config from tests.integration.utils import send_invalid_payload_and_get_logs log = send_invalid_payload_and_get_logs() - log.error.assert_has_calls( - [ - mock.call( - "failed to send traces to intake at %s: HTTP error status %s, reason %s", - "http://localhost:8126/v0.5/traces", - 400, - "Bad Request", - ) - ] - ) + if config._trace_writer_native: + log.error.assert_has_calls( + [ + mock.call( + "failed to send, dropping %d traces to intake at %s: %s", + 0, + "http://localhost:8126/v0.5/traces", + "Invalid format: Unable to read payload len", + ) + ] + ) + else: + log.error.assert_has_calls( + [ + mock.call( + "failed to send traces to intake at %s: HTTP error status %s, reason %s", + "http://localhost:8126/v0.5/traces", + 400, + "Bad Request", + ) + ] + ) @skip_if_testagent @@ -481,22 +538,37 @@ def test_trace_with_invalid_payload_generates_error_log(): def test_trace_with_invalid_payload_logs_payload_when_LOG_ERROR_PAYLOADS(): import mock + from ddtrace import config from tests.integration.utils import send_invalid_payload_and_get_logs log = send_invalid_payload_and_get_logs() - log.error.assert_has_calls( - [ - mock.call( - "failed to send traces to intake at %s: HTTP error status %s, reason %s, payload %s", - "http://localhost:8126/v0.5/traces", - 400, - "Bad Request", - "6261645f7061796c6f6164", - ) - ] - ) + if config._trace_writer_native: + log.error.assert_has_calls( + [ + mock.call( + "failed to send, dropping %d traces to intake at %s: %s, payload %s", + 0, + "http://localhost:8126/v0.5/traces", + "Invalid format: Unable to read payload len", + "6261645f7061796c6f6164", + ) + ] + ) + else: + log.error.assert_has_calls( + [ + mock.call( + "failed to send traces to intake at %s: HTTP error status %s, reason %s, payload %s", + "http://localhost:8126/v0.5/traces", + 400, + "Bad Request", + "6261645f7061796c6f6164", + ) + ] + ) +@skip_if_native_writer @skip_if_testagent @pytest.mark.subprocess(env={"_DD_TRACE_WRITER_LOG_ERROR_PAYLOADS": "true", "DD_TRACE_API_VERSION": "v0.5"}, err=None) def test_trace_with_non_bytes_payload_logs_payload_when_LOG_ERROR_PAYLOADS(): @@ -561,6 +633,7 @@ def test_api_version_downgrade_generates_no_warning_logs(): @skip_if_testagent +@skip_if_native_writer @parametrize_with_all_encodings() def test_writer_flush_queue_generates_debug_log(): import logging @@ -568,14 +641,13 @@ def test_writer_flush_queue_generates_debug_log(): import mock - from ddtrace.internal.writer import AgentWriter - from ddtrace.settings._agent import config as agent_config + from ddtrace.internal.writer import create_trace_writer from tests.utils import AnyFloat from tests.utils import AnyInt from tests.utils import AnyStr encoding = os.environ["DD_TRACE_API_VERSION"] - writer = AgentWriter(agent_config.trace_agent_url) + writer = create_trace_writer() with mock.patch("ddtrace.internal.writer.writer.log") as log: writer.write([]) diff --git a/tests/integration/test_integration_snapshots.py b/tests/integration/test_integration_snapshots.py index fcc00d43289..2597698eed4 100644 --- a/tests/integration/test_integration_snapshots.py +++ b/tests/integration/test_integration_snapshots.py @@ -78,13 +78,21 @@ def process_trace(self, trace): # Have to use sync mode snapshot so that the traces are associated to this # test case since we use a custom writer (that doesn't have the trace headers # injected). -@pytest.mark.subprocess() +@pytest.mark.subprocess(parametrize={"writer_class": ["AgentWriter", "NativeWriter"]}) @snapshot(async_mode=False) -def test_synchronous_writer(): +def test_synchronous_writer(writer_class): + import os + from ddtrace.internal.writer import AgentWriter + from ddtrace.internal.writer import NativeWriter from ddtrace.trace import tracer - writer = AgentWriter(tracer._span_aggregator.writer.intake_url, sync_mode=True) + if os.environ["writer_class"] == "AgentWriter": + writer_class = AgentWriter + elif os.environ["writer_class"] == "NativeWriter": + writer_class = NativeWriter + + writer = writer_class(tracer._span_aggregator.writer.intake_url, sync_mode=True) tracer._span_aggregator.writer = writer tracer._recreate() with tracer.trace("operation1", service="my-svc"): diff --git a/tests/integration/test_priority_sampling.py b/tests/integration/test_priority_sampling.py index 812df760384..d80084b8d4b 100644 --- a/tests/integration/test_priority_sampling.py +++ b/tests/integration/test_priority_sampling.py @@ -2,11 +2,13 @@ import pytest +from ddtrace import config from ddtrace.constants import AUTO_KEEP from ddtrace.constants import AUTO_REJECT from ddtrace.internal.encoding import JSONEncoder from ddtrace.internal.encoding import MsgpackEncoderV04 as Encoder from ddtrace.internal.writer import AgentWriter +from ddtrace.internal.writer import NativeWriter from ddtrace.trace import tracer as ddtracer from tests.integration.utils import AGENT_VERSION from tests.integration.utils import parametrize_with_all_encodings @@ -29,7 +31,9 @@ def monkeypatched_write(self, spans=None): tracer._span_aggregator.writer.traces = [] tracer._span_aggregator.writer.json_encoder = JSONEncoder() tracer._span_aggregator.writer.msgpack_encoder = Encoder(4 << 20, 4 << 20) - tracer._span_aggregator.writer.write = monkeypatched_write.__get__(tracer._span_aggregator.writer, AgentWriter) + tracer._span_aggregator.writer.write = monkeypatched_write.__get__( + tracer._span_aggregator.writer, NativeWriter if config._trace_writer_native else AgentWriter + ) def _prime_tracer_with_priority_sample_rate_from_agent(t, service): diff --git a/tests/integration/test_trace_stats.py b/tests/integration/test_trace_stats.py index 1f38510c99c..830e60b5eea 100644 --- a/tests/integration/test_trace_stats.py +++ b/tests/integration/test_trace_stats.py @@ -9,6 +9,7 @@ from ddtrace.ext import http from ddtrace.internal.processor.stats import SpanStatsProcessorV06 from tests.integration.utils import AGENT_VERSION +from tests.integration.utils import skip_if_native_writer from tests.utils import DummyTracer from tests.utils import override_global_config @@ -48,19 +49,24 @@ def send_once_stats_tracer(stats_tracer): """ This is a variation on the tracer that has the SpanStatsProcessor disabled until we leave the tracer context. """ + stats_tracer.trace = functools.partial(consistent_end_trace, stats_tracer.trace) # Stop the stats processor while running the function, to prevent flushing + stats_processor = None for processor in stats_tracer._span_processors: if isinstance(processor, SpanStatsProcessorV06): - processor.stop() + stats_processor = processor + stats_processor.stop() break yield stats_tracer # Restart the stats processor; it will be flushed during shutdown - processor.start() + if stats_processor: + stats_processor.start() +@skip_if_native_writer @pytest.mark.parametrize("envvar", ["DD_TRACE_STATS_COMPUTATION_ENABLED", "DD_TRACE_COMPUTE_STATS"]) def test_compute_stats_default_and_configure(run_python_code_in_subprocess, envvar): """Ensure stats computation can be enabled.""" @@ -86,6 +92,7 @@ def test_compute_stats_default_and_configure(run_python_code_in_subprocess, envv assert status == 0, out + err +@skip_if_native_writer def test_apm_opt_out_compute_stats_and_configure_env(run_python_code_in_subprocess): # Test via environment variable env = os.environ.copy() diff --git a/tests/integration/utils.py b/tests/integration/utils.py index e993ddea69b..79d9f7ccc80 100644 --- a/tests/integration/utils.py +++ b/tests/integration/utils.py @@ -57,6 +57,12 @@ def skip_if_testagent(f): )(f) +def skip_if_native_writer(f): + from ddtrace import config + + return pytest.mark.skipif(config._trace_writer_native, reason="Test incompatible with the native writer")(f) + + def import_ddtrace_in_subprocess(env): p = subprocess.Popen( [sys.executable, "-c", "import ddtrace"], diff --git a/tests/suitespec.yml b/tests/suitespec.yml index ff1ed071123..f7a383f2cc9 100644 --- a/tests/suitespec.yml +++ b/tests/suitespec.yml @@ -177,7 +177,7 @@ suites: pattern: integration-latest* runner: riot integration_testagent: - parallelism: 1 + parallelism: 2 paths: - '@tracing' - '@bootstrap' diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 5e289c457c7..b6c804674f5 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -533,6 +533,7 @@ def test_app_started_event_configuration_override(test_agent_session, run_python {"name": "_DD_APPSEC_DEDUPLICATION_ENABLED", "origin": "default", "value": True}, {"name": "_DD_IAST_LAZY_TAINT", "origin": "default", "value": False}, {"name": "_DD_TRACE_WRITER_LOG_ERROR_PAYLOADS", "origin": "default", "value": False}, + {"name": "_DD_TRACE_WRITER_NATIVE", "origin": "default", "value": False}, {"name": "instrumentation_source", "origin": "code", "value": "manual"}, {"name": "python_build_gnu_type", "origin": "unknown", "value": sysconfig.get_config_var("BUILD_GNU_TYPE")}, {"name": "python_host_gnu_type", "origin": "unknown", "value": sysconfig.get_config_var("HOST_GNU_TYPE")}, diff --git a/tests/tracer/test_processors.py b/tests/tracer/test_processors.py index f302acb2155..4dd7aca56f2 100644 --- a/tests/tracer/test_processors.py +++ b/tests/tracer/test_processors.py @@ -25,6 +25,7 @@ from ddtrace.internal.sampling import SpanSamplingRule from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE from ddtrace.internal.writer import AgentWriter +from ddtrace.internal.writer import NativeWriter from ddtrace.trace import Context from ddtrace.trace import Span from tests.utils import DummyTracer @@ -138,7 +139,8 @@ def test_aggregator_reset_default_args(): assert len(aggr._span_metrics["spans_created"]) == 0 -def test_aggregator_reset_with_args(): +@pytest.mark.parametrize("writer_class", (AgentWriter, NativeWriter)) +def test_aggregator_reset_with_args(writer_class): """ Validates that the span aggregator can reset trace buffers, sampling processor, user processors/filters and trace api version (when ASM is enabled) @@ -153,7 +155,7 @@ def test_aggregator_reset_with_args(): user_processors=[user_proc], ) - aggr.writer = AgentWriter("", api_version="v0.5") + aggr.writer = writer_class("http://localhost:8126", api_version="v0.5") span = Span("span", on_finish=[aggr.on_span_finish]) aggr.on_span_start(span) diff --git a/tests/tracer/test_writer.py b/tests/tracer/test_writer.py index 9bf17cea4ca..41851afda87 100644 --- a/tests/tracer/test_writer.py +++ b/tests/tracer/test_writer.py @@ -18,10 +18,13 @@ from ddtrace.constants import _KEEP_SPANS_RATE_KEY from ddtrace.internal.ci_visibility.writer import CIVisibilityWriter from ddtrace.internal.encoding import MSGPACK_ENCODERS +from ddtrace.internal.native._native import IoError +from ddtrace.internal.native._native import NetworkError from ddtrace.internal.runtime import get_runtime_id from ddtrace.internal.uds import UDSHTTPConnection from ddtrace.internal.writer import AgentWriter from ddtrace.internal.writer import LogWriter +from ddtrace.internal.writer import NativeWriter from ddtrace.internal.writer import Response from ddtrace.internal.writer import _human_size from ddtrace.trace import Span @@ -461,6 +464,182 @@ def test_keep_rate(self): assert 0.6 == trace[0]["metrics"].get(_KEEP_SPANS_RATE_KEY, -1) +class NativeWriterTests(AgentWriterTests): + WRITER_CLASS = NativeWriter + + def test_metrics_bad_endpoint(self): + statsd = mock.Mock() + with override_global_config(dict(_health_metrics_enabled=True)): + writer = self.WRITER_CLASS("http://asdf:1234", dogstatsd=statsd, sync_mode=False) + for i in range(10): + writer.write([Span(name="name", trace_id=i, span_id=j + 1, parent_id=j or None) for j in range(5)]) + writer.stop() + writer.join() + + statsd.distribution.assert_has_calls( + [mock.call("datadog.%s.buffer.accepted.traces" % writer.STATSD_NAMESPACE, 1, tags=None)] * 10 + + [mock.call("datadog.%s.buffer.accepted.spans" % writer.STATSD_NAMESPACE, 5, tags=None)] * 10, + any_order=True, + ) + + def test_metrics_trace_too_big(self): + statsd = mock.Mock() + with override_global_config(dict(_health_metrics_enabled=True, _trace_writer_buffer_size=15000)): + writer = self.WRITER_CLASS("http://asdf:1234", dogstatsd=statsd) + for i in range(10): + writer.write([Span(name="name", trace_id=i, span_id=j + 1, parent_id=j or None) for j in range(5)]) + + massive_trace = [] + for i in range(10): + span = Span("mmon", "mmon" + str(i), "mmon" + str(i)) + for j in range(50): + key = "opqr012|~" + str(i) + str(j) + val = "stuv345!@#" + str(i) + str(j) + span.set_tag_str(key, val) + massive_trace.append(span) + + writer.write(massive_trace) + writer.stop() + writer.join() + + statsd.distribution.assert_has_calls( + [mock.call("datadog.%s.buffer.accepted.traces" % writer.STATSD_NAMESPACE, 1, tags=None)] * 10 + + [mock.call("datadog.%s.buffer.accepted.spans" % writer.STATSD_NAMESPACE, 5, tags=None)] * 10 + + [mock.call("datadog.%s.buffer.dropped.traces" % writer.STATSD_NAMESPACE, 1, tags=["reason:t_too_big"])] + + [ + mock.call( + "datadog.%s.buffer.dropped.bytes" % writer.STATSD_NAMESPACE, AnyInt(), tags=["reason:t_too_big"] + ) + ], + any_order=True, + ) + + def test_metrics_multi(self): + statsd = mock.Mock() + with override_global_config(dict(_health_metrics_enabled=True)): + writer = self.WRITER_CLASS("http://asdf:1234", dogstatsd=statsd, sync_mode=False) + for i in range(10): + writer.write([Span(name="name", trace_id=i, span_id=j + 1, parent_id=j) for j in range(5)]) + writer.flush_queue() + statsd.distribution.assert_has_calls( + [mock.call("datadog.%s.buffer.accepted.traces" % writer.STATSD_NAMESPACE, 1, tags=None)] * 10 + + [mock.call("datadog.%s.buffer.accepted.spans" % writer.STATSD_NAMESPACE, 5, tags=None)] * 10, + any_order=True, + ) + + statsd.reset_mock() + + for i in range(10): + writer.write([Span(name="name", trace_id=i, span_id=j + 1, parent_id=j) for j in range(5)]) + writer.stop() + writer.join() + + statsd.distribution.assert_has_calls( + [mock.call("datadog.%s.buffer.accepted.traces" % writer.STATSD_NAMESPACE, 1, tags=None)] * 10 + + [mock.call("datadog.%s.buffer.accepted.spans" % writer.STATSD_NAMESPACE, 5, tags=None)] * 10, + any_order=True, + ) + + def test_write_sync(self): + statsd = mock.Mock() + with override_global_config(dict(_health_metrics_enabled=True)): + writer = self.WRITER_CLASS("http://asdf:1234", dogstatsd=statsd, sync_mode=True) + writer.write([Span(name="name", trace_id=1, span_id=j + 1, parent_id=j or None) for j in range(5)]) + statsd.distribution.assert_has_calls( + [ + mock.call("datadog.%s.buffer.accepted.traces" % writer.STATSD_NAMESPACE, 1, tags=None), + mock.call("datadog.%s.buffer.accepted.spans" % writer.STATSD_NAMESPACE, 5, tags=None), + ], + any_order=True, + ) + + def test_keep_rate(self): + statsd = mock.Mock() + writer_run_periodic = mock.Mock() + writer_exporter = mock.Mock() + writer_exporter_send = mock.Mock() + with override_global_config(dict(_health_metrics_enabled=False, _trace_writer_buffer_size=8 << 20)): + # this test decodes the msgpack payload to verify the keep rate. v04 is easier to decode so we use that here + writer = self.WRITER_CLASS("http://asdf:1234", dogstatsd=statsd, api_version="v0.4") + writer.run_periodic = writer_run_periodic + writer._exporter = writer_exporter + writer._exporter.send = writer_exporter_send + + traces = [ + [Span(name="name", trace_id=i, span_id=j + 1, parent_id=j) for j in range(5)] for i in range(1, 5) + ] + + traces_too_big = [ + [Span(name="a" * 5000, trace_id=i, span_id=j + 1, parent_id=j) for j in range(1, 2**10)] + for i in range(4) + ] + + # 1. We write 4 traces successfully. + for trace in traces: + writer.write(trace) + writer.flush_queue() + + payload = msgpack.unpackb(writer_exporter_send.call_args.args[0]) + # No previous drops. + assert 0.0 == writer._drop_sma.get() + # 4 traces written. + assert 4 == len(payload) + # 100% of traces kept (refers to the past). + # No traces sent before now so 100% kept. + for trace in payload: + assert 1.0 == trace[0]["metrics"].get(_KEEP_SPANS_RATE_KEY, -1) + + # 2. We fail to write 4 traces because of size limitation. + for trace in traces_too_big: + writer.write(trace) + writer.flush_queue() + + # 50% of traces were dropped historically. + # 4 successfully written before and 4 dropped now. + assert 0.5 == writer._drop_sma.get() + # put not called since no new traces are available. + writer_exporter_send.assert_called_once() + + # 3. We write 2 traces successfully. + for trace in traces[:2]: + writer.write(trace) + writer.flush_queue() + + payload = msgpack.unpackb(writer_exporter_send.call_args.args[0]) + # 40% of traces were dropped historically. + assert 0.4 == writer._drop_sma.get() + # 2 traces written. + assert 2 == len(payload) + # 50% of traces kept (refers to the past). + # We had 4 successfully written and 4 dropped. + for trace in payload: + assert 0.5 == trace[0]["metrics"].get(_KEEP_SPANS_RATE_KEY, -1) + + # 4. We write 1 trace successfully and fail to write 3. + writer.write(traces[0]) + for trace in traces_too_big[:3]: + writer.write(trace) + writer.flush_queue() + + payload = msgpack.unpackb(writer_exporter_send.call_args.args[0]) + # 50% of traces were dropped historically. + assert 0.5 == writer._drop_sma.get() + # 1 trace written. + assert 1 == len(payload) + # 60% of traces kept (refers to the past). + # We had 4 successfully written, then 4 dropped, then 2 written. + for trace in payload: + assert 0.6 == trace[0]["metrics"].get(_KEEP_SPANS_RATE_KEY, -1) + + # Http related metrics are sent by the native code + def test_drop_reason_bad_endpoint(self): + pytest.skip() + + # The NativeWriter does not support gzip compression + def test_gzip_compression_exception_logging_and_metrics(self): + pytest.skip() + + class CIVisibilityWriterTests(AgentWriterTests): WRITER_CLASS = CIVisibilityWriter @@ -548,17 +727,29 @@ def do_PUT(self): assert self.path.startswith(self.expected_path_prefix) self.send_error(200, "OK") + def do_POST(self): + if self.expected_path_prefix is not None: + assert self.path.startswith(self.expected_path_prefix) + self.send_error(200, "OK") + class _TimeoutAPIEndpointRequestHandlerTest(_BaseHTTPRequestHandler): def do_PUT(self): # This server sleeps longer than our timeout time.sleep(5) + def do_POST(self): + # This server sleeps longer than our timeout + time.sleep(5) + class _ResetAPIEndpointRequestHandlerTest(_BaseHTTPRequestHandler): def do_PUT(self): return + def do_POST(self): + return + _HOST = "0.0.0.0" _PORT = 8743 @@ -653,7 +844,9 @@ def configure(expected_path_prefix=None): thread.join() -@pytest.mark.parametrize("writer_and_path", ((AgentWriter, "/v0."), (CIVisibilityWriter, "/api/v2/citestcycle"))) +@pytest.mark.parametrize( + "writer_and_path", ((AgentWriter, "/v0."), (NativeWriter, "/v0."), (CIVisibilityWriter, "/api/v2/citestcycle")) +) def test_agent_url_path(endpoint_assert_path, writer_and_path): with override_env(dict(DD_API_KEY="foobar.baz")): writer_class, path = writer_and_path @@ -675,45 +868,45 @@ def test_agent_url_path(endpoint_assert_path, writer_and_path): writer.flush_queue(raise_exc=True) -@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter)) +@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter, NativeWriter)) def test_flush_connection_timeout_connect(writer_class): with override_env(dict(DD_API_KEY="foobar.baz")): writer = writer_class("http://%s:%s" % (_HOST, 2019)) - exc_type = OSError + exc_type = (OSError, NetworkError) with pytest.raises(exc_type): writer._encoder.put([Span("foobar")]) writer.flush_queue(raise_exc=True) -@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter)) +@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter, NativeWriter)) def test_flush_connection_timeout(endpoint_test_timeout_server, writer_class): with override_env(dict(DD_API_KEY="foobar.baz")): writer = writer_class("http://%s:%s" % (_HOST, _TIMEOUT_PORT)) writer.HTTP_METHOD = "PUT" # the test server only accepts PUT - with pytest.raises(socket.timeout): + with pytest.raises((socket.timeout, IoError)): writer._encoder.put([Span("foobar")]) writer.flush_queue(raise_exc=True) -@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter)) +@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter, NativeWriter)) def test_flush_connection_reset(endpoint_test_reset_server, writer_class): with override_env(dict(DD_API_KEY="foobar.baz")): writer = writer_class("http://%s:%s" % (_HOST, _RESET_PORT)) - exc_types = (httplib.BadStatusLine, ConnectionResetError) + exc_types = (httplib.BadStatusLine, ConnectionResetError, NetworkError) with pytest.raises(exc_types): writer.HTTP_METHOD = "PUT" # the test server only accepts PUT writer._encoder.put([Span("foobar")]) writer.flush_queue(raise_exc=True) -@pytest.mark.parametrize("writer_class", (AgentWriter,)) +@pytest.mark.parametrize("writer_class", (AgentWriter, NativeWriter)) def test_flush_connection_uds(endpoint_uds_server, writer_class): writer = writer_class("unix://%s" % endpoint_uds_server.server_address) writer._encoder.put([Span("foobar")]) writer.flush_queue(raise_exc=True) -@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter)) +@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter, NativeWriter)) def test_flush_queue_raise(writer_class): with override_env(dict(DD_API_KEY="foobar.baz")): writer = writer_class("http://dne:1234") @@ -722,13 +915,13 @@ def test_flush_queue_raise(writer_class): writer.write([]) writer.flush_queue(raise_exc=False) - error = OSError + error = (OSError, NetworkError) with pytest.raises(error): writer.write([Span("name")]) writer.flush_queue(raise_exc=True) -@pytest.mark.parametrize("writer_class", (AgentWriter,)) +@pytest.mark.parametrize("writer_class", (AgentWriter, NativeWriter)) def test_racing_start(writer_class): writer = writer_class("http://dne:1234") @@ -764,13 +957,14 @@ def test_additional_headers_constructor(): assert writer._headers["header2"] == "value2" -@pytest.mark.parametrize("writer_class", (AgentWriter,)) +@pytest.mark.parametrize("writer_class", (AgentWriter, NativeWriter)) def test_bad_encoding(monkeypatch, writer_class): with override_global_config({"_trace_api": "foo"}): writer = writer_class("http://localhost:9126") assert writer._api_version == "v0.5" +@pytest.mark.parametrize("writer_class", (AgentWriter, NativeWriter)) @pytest.mark.parametrize( "init_api_version,api_version,endpoint,encoder_cls", [ @@ -779,8 +973,8 @@ def test_bad_encoding(monkeypatch, writer_class): ("v0.5", "v0.5", "v0.5/traces", MSGPACK_ENCODERS["v0.5"]), ], ) -def test_writer_recreate_api_version(init_api_version, api_version, endpoint, encoder_cls): - writer = AgentWriter("http://dne:1234", api_version=init_api_version) +def test_writer_recreate_api_version(writer_class, init_api_version, api_version, endpoint, encoder_cls): + writer = writer_class("http://dne:1234", api_version=init_api_version) assert writer._api_version == api_version assert writer._endpoint == endpoint assert isinstance(writer._encoder, encoder_cls) @@ -801,14 +995,23 @@ def test_writer_recreate_keeps_headers(): assert writer._headers["Datadog-Client-Computed-Stats"] == "yes" -def test_writer_recreate_keeps_response_callback(): +def test_native_writer_recreate_keeps_stats_opt_out(): + writer = NativeWriter("http://dne:1234", stats_opt_out=True) + assert writer._stats_opt_out + + writer = writer.recreate() + assert writer._stats_opt_out + + +@pytest.mark.parametrize("writer_class", (AgentWriter, NativeWriter)) +def test_writer_recreate_keeps_response_callback(writer_class): def response_callback(response): pass - writer = AgentWriter("http://dne:1234", response_callback=response_callback) + writer = writer_class("http://dne:1234", response_callback=response_callback) assert writer._response_cb is response_callback writer = writer.recreate() - assert isinstance(writer, AgentWriter) + assert isinstance(writer, writer_class) assert writer._response_cb is response_callback @@ -853,7 +1056,7 @@ def response_callback(response): ("darwin", None, "v0.5", False, "v0.5"), ], ) -@pytest.mark.parametrize("writer_class", (AgentWriter,)) +@pytest.mark.parametrize("writer_class", (AgentWriter, NativeWriter)) def test_writer_api_version_selection( sys_platform, api_version, diff --git a/tests/utils.py b/tests/utils.py index ef67b253b22..6dc6f11a3ac 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -42,6 +42,8 @@ from ddtrace.internal.utils.formats import asbool from ddtrace.internal.utils.formats import parse_tags_str from ddtrace.internal.writer import AgentWriter +from ddtrace.internal.writer import AgentWriterInterface +from ddtrace.internal.writer import NativeWriter from ddtrace.propagation._database_monitoring import listen as dbm_config_listen from ddtrace.propagation._database_monitoring import unlisten as dbm_config_unlisten from ddtrace.propagation.http import _DatadogMultiHeader @@ -572,7 +574,7 @@ def pop_traces(self): return traces -class DummyWriter(DummyWriterMixin, AgentWriter): +class DummyWriter(DummyWriterMixin, AgentWriterInterface): """DummyWriter is a small fake writer used for tests. not thread-safe.""" def __init__(self, *args, **kwargs): @@ -587,7 +589,15 @@ def __init__(self, *args, **kwargs): # DEV: We don't want to do anything with the response callback # so we set it to a no-op lambda function kwargs["response_callback"] = lambda *args, **kwargs: None - AgentWriter.__init__(self, *args, **kwargs) + if dd_config._trace_writer_native: + kwargs["compute_stats_enabled"] = dd_config._trace_compute_stats + kwargs["stats_opt_out"] = asm_config._apm_opt_out + self._inner_writer = NativeWriter(*args, **kwargs) + else: + if dd_config._trace_compute_stats or asm_config._apm_opt_out: + kwargs["headers"] = {"Datadog-Client-Computed-Stats": "yes"} + self._inner_writer = AgentWriter(*args, **kwargs) + DummyWriterMixin.__init__(self, *args, **kwargs) def write(self, spans=None): @@ -596,7 +606,7 @@ def write(self, spans=None): traces = [spans] self.json_encoder.encode_traces(traces) if self._trace_flush_enabled: - AgentWriter.write(self, spans=spans) + self._inner_writer.write(spans=spans) else: self.msgpack_encoder.put(spans) self.msgpack_encoder.encode() @@ -610,6 +620,53 @@ def pop(self): def recreate(self, appsec_enabled: Optional[bool] = None) -> "DummyWriter": return self.__class__(trace_flush_enabled=self._trace_flush_enabled) + def flush_queue(self, raise_exc: bool = False) -> None: + return self._inner_writer.flush_queue(raise_exc) + + def before_fork(self) -> None: + return self._inner_writer.before_fork() + + def set_test_session_token(self, token: Optional[str]) -> None: + return self._inner_writer.set_test_session_token(token) + + def stop(self, timeout: Optional[float] = None) -> None: + self._inner_writer.stop(timeout=timeout) + + @property + def interval(self) -> float: + return self._inner_writer._interval + + @interval.setter + def interval( + self, + value: float, + ) -> None: + self._inner_writer.interval = value + + def _start_service(self, *args, **kwargs) -> None: + self._inner_writer._start_service(*args, **kwargs) + + def join( + self, + timeout: Optional[float], + ) -> None: + self._inner_writer.join(timeout=timeout) + + def periodic(self): + self._inner_writer.periodic() + + def _stop_service( + self, + timeout: Optional[float] = None, + ) -> None: + self._inner_writer._stop_service(timeout=timeout) + + def on_shutdown(self): + self._inner_writer.on_shutdown() + + def __getattr__(self, name: str): + return self._inner_writer.__getattribute__(name) + class DummyCIVisibilityWriter(DummyWriterMixin, CIVisibilityWriter): def __init__(self, *args, **kwargs): @@ -1170,7 +1227,10 @@ def snapshot_context( if async_mode: # Patch the tracer writer to include the test token header for all requests. - tracer._span_aggregator.writer._headers["X-Datadog-Test-Session-Token"] = token + if isinstance(tracer._span_aggregator.writer, AgentWriterInterface): + tracer._span_aggregator.writer.set_test_session_token(token) + else: + tracer._span_aggregator.writer._headers["X-Datadog-Test-Session-Token"] = token # Also add a header to the environment for subprocesses test cases that might use snapshotting. existing_headers = parse_tags_str(os.environ.get("_DD_TRACE_WRITER_ADDITIONAL_HEADERS", "")) @@ -1210,7 +1270,10 @@ def snapshot_context( # Force a flush so all traces are submitted. tracer._span_aggregator.writer.flush_queue() if async_mode: - del tracer._span_aggregator.writer._headers["X-Datadog-Test-Session-Token"] + if isinstance(tracer._span_aggregator.writer, AgentWriterInterface): + tracer._span_aggregator.writer.set_test_session_token(None) + else: + del tracer._span_aggregator.writer._headers["X-Datadog-Test-Session-Token"] del os.environ["_DD_TRACE_WRITER_ADDITIONAL_HEADERS"] conn = httplib.HTTPConnection(parsed.hostname, parsed.port) @@ -1423,13 +1486,10 @@ def flush_test_tracer_spans(writer): [(encoded_traces, _)] = encoded_traces if encoded_traces is None: return - headers = writer._get_finalized_headers(n_traces, client) - response = writer._put(encoded_traces, add_dd_env_variables_to_headers(headers), client, no_trace=True) + writer._send_payload(encoded_traces, n_traces, client) except Exception: return - assert response.status == 200, response.body - def add_dd_env_variables_to_headers(headers): dd_env_vars = {key: value for key, value in os.environ.items() if key.startswith("DD_")}