Skip to content

feat(duckdb): adding duckdb integration #14232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions benchmarks/duckdb/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
baseline: &baseline
tracer_enabled: false
query_type: "simple"
dataset_size: 1000

# Simple query benchmarks
simple-baseline:
<<: *baseline
query_type: "simple"

simple-traced:
<<: *baseline
tracer_enabled: true
query_type: "simple"

# Bulk insert benchmarks
bulk-insert-baseline:
<<: *baseline
query_type: "bulk_insert"
dataset_size: 1000

bulk-insert-traced:
<<: *baseline
tracer_enabled: true
query_type: "bulk_insert"
dataset_size: 1000

# Analytical query benchmarks
analytical-small-baseline:
<<: *baseline
query_type: "analytical"
dataset_size: 5000

analytical-small-traced:
<<: *baseline
tracer_enabled: true
query_type: "analytical"
dataset_size: 5000

analytical-large-baseline:
<<: *baseline
query_type: "analytical"
dataset_size: 25000

analytical-large-traced:
<<: *baseline
tracer_enabled: true
query_type: "analytical"
dataset_size: 25000

# Complex JOIN benchmarks
join-baseline:
<<: *baseline
query_type: "complex_join"
dataset_size: 10000

join-traced:
<<: *baseline
tracer_enabled: true
query_type: "complex_join"
dataset_size: 10000

2 changes: 2 additions & 0 deletions benchmarks/duckdb/requirements_scenario.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
duckdb>=1.3.0

114 changes: 114 additions & 0 deletions benchmarks/duckdb/scenario.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import bm
import duckdb


class DuckDBScenario(bm.Scenario):
tracer_enabled: bool
query_type: str
dataset_size: int

def run(self):
if self.tracer_enabled:
from ddtrace import patch
patch(duckdb=True)

# Create connection and setup data based on scenario
conn = duckdb.connect(":memory:")
cursor = conn.cursor()

if self.query_type == "simple":
# Simple query benchmark
def _(loops):
for _ in range(loops):
cursor.execute("SELECT ? as value", (42,))
rows = cursor.fetchall()
assert len(rows) == 1

elif self.query_type == "bulk_insert":
# Bulk insert benchmark
cursor.execute("CREATE TABLE bulk_test (id INTEGER, value VARCHAR, timestamp TIMESTAMP)")
test_data = [(i, f"value_{i}", "2023-01-01 00:00:00") for i in range(self.dataset_size)]

def _(loops):
for _ in range(loops):
cursor.executemany("INSERT INTO bulk_test VALUES (?, ?, ?)", test_data)
# Clean up for next iteration
cursor.execute("DELETE FROM bulk_test")

elif self.query_type == "analytical":
# Analytical query benchmark
cursor.execute(f"""
CREATE TABLE analytics_test AS
SELECT
i as id,
i % 10 as category,
i * 1.5 as value,
date '2023-01-01' + (i % 365)::INTEGER as date_key
FROM range({self.dataset_size}) t(i)
""")

analytical_query = """
SELECT
category,
COUNT(*) as count,
AVG(value) as avg_value,
SUM(value) as total_value
FROM analytics_test
WHERE value > 100
GROUP BY category
ORDER BY category
"""

def _(loops):
for _ in range(loops):
cursor.execute(analytical_query)
rows = cursor.fetchall()
assert len(rows) > 0

elif self.query_type == "complex_join":
# Complex JOIN benchmark
cursor.execute(f"""
CREATE TABLE customers AS
SELECT
i as customer_id,
'Customer_' || i as name,
('Region_' || (i % 10)) as region
FROM range(1, {self.dataset_size // 10 + 1}) t(i)
""")

cursor.execute(f"""
CREATE TABLE orders AS
SELECT
i as order_id,
(i % {self.dataset_size // 10}) + 1 as customer_id,
(random() * 1000)::INTEGER as amount,
date '2023-01-01' + (i % 365)::INTEGER as order_date
FROM range({self.dataset_size}) t(i)
""")

join_query = """
SELECT
c.region,
COUNT(o.order_id) as order_count,
SUM(o.amount) as total_amount,
AVG(o.amount) as avg_amount
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.region
ORDER BY total_amount DESC
"""

def _(loops):
for _ in range(loops):
cursor.execute(join_query)
rows = cursor.fetchall()
assert len(rows) > 0

else:
raise ValueError(f"Unknown query_type: {self.query_type}")

yield _

# Cleanup
conn.close()

1 change: 1 addition & 0 deletions ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"consul": True,
"ddtrace_api": True,
"django": True,
"duckdb": True,
"dramatiq": True,
"elasticsearch": True,
"algoliasearch": True,
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/contrib/integration_registry/registry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ integrations:
min: 1.10.0
max: 1.17.0

- integration_name: duckdb
is_external_package: true
is_tested: true
dependency_names:
- duckdb
tested_versions_by_dependency:
duckdb:
min: 1.3.0
max: 1.3.2

- integration_name: elasticsearch
is_external_package: true
is_tested: true
Expand Down
66 changes: 66 additions & 0 deletions ddtrace/contrib/internal/duckdb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
The DuckDB integration instruments the DuckDB library.


Enabling
~~~~~~~~

The DuckDB integration is enabled automatically when using
:ref:`ddtrace-run <ddtracerun>` or :ref:`import ddtrace.auto <ddtraceauto>`.

Or use :func:`patch() <ddtrace.patch>` to manually enable the integration::

from ddtrace import patch
patch(duckdb=True)


Global Configuration
~~~~~~~~~~~~~~~~~~~~

.. py:data:: ddtrace.config.duckdb["service"]

The service name reported by default for DuckDB instances.

This option can also be set with the ``DD_DUCKDB_SERVICE`` environment
variable.

Default: ``"duckdb"``


Instance Configuration
~~~~~~~~~~~~~~~~~~~~~~

To configure the DuckDB integration on an per-instance basis use the
``Pin`` API::

from ddtrace.trace import Pin
from ddtrace import patch

# Make sure to patch before importing duckdb
patch(duckdb=True)

import duckdb

# This will report a span with the default settings
conn = duckdb.connect(database=":memory:")

# Use a pin to override the service name for this connection.
Pin.override(conn, service="duckdb")

# Insert data into the database
conn.execute("CREATE TABLE test (id INTEGER, name VARCHAR)")
conn.execute("INSERT INTO test VALUES (1, 'Alice')")
conn.execute("INSERT INTO test VALUES (2, 'Bob')")

# Query the data
cursor = conn.cursor()
cursor.execute("SELECT *")
print(cursor.fetchall())
"""

from .patch import get_version
from .patch import patch
from .patch import unpatch


__all__ = ["patch", "unpatch", "get_version"]
73 changes: 73 additions & 0 deletions ddtrace/contrib/internal/duckdb/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
from typing import Dict

import duckdb
import wrapt

from ddtrace import config
from ddtrace.contrib.dbapi import FetchTracedCursor
from ddtrace.contrib.dbapi import TracedConnection
from ddtrace.contrib.dbapi import TracedCursor
from ddtrace.ext import db
from ddtrace.ext import net
from ddtrace.internal.schema import schematize_service_name
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.utils.wrappers import unwrap
from ddtrace.trace import Pin

config._add(
"duckdb",
dict(
trace_fetch_methods=asbool(os.getenv("DD_DUCKDB_TRACE_FETCH_METHODS", default=False)),
_default_service=schematize_service_name("duckdb"),
_dbapi_span_name_prefix="duckdb",
),
)


def _supported_versions() -> dict[str, str]:
return {"duckdb": ">=1.3.0"}


def get_version():
# get the package distribution version here
return duckdb.__version__


def patch():
if getattr(duckdb, "_datadog_patch", False):
return
duckdb._datadog_patch = True
wrapt.wrap_function_wrapper("duckdb", "connect", _connect)


def unpatch():
if getattr(duckdb, "_datadog_patch", False):
duckdb._datadog_patch = False
unwrap(duckdb, "connect")


def _connect(func, instance, args, kwargs):
conn = func(*args, **kwargs)

# DuckDB connection parameters
# For DuckDB, the first argument is typically the database path
# or ":memory:" for in-memory databases
database_path = args[0] if args else kwargs.get("database", ":memory:")

# DuckDB is typically used as an embedded database
# so we don't have traditional host/port like other databases
# but we can still set some meaningful tags
tags = {
db.NAME: database_path,
db.SYSTEM: "duckdb",
}

pin = Pin(tags=tags)

# Choose cursor class based on trace_fetch_methods configuration
cursor_cls = FetchTracedCursor if config.duckdb.trace_fetch_methods else TracedCursor

wrapped = TracedConnection(conn, pin=pin, cfg=config.duckdb, cursor_cls=cursor_cls)
pin.onto(wrapped)
return wrapped
1 change: 1 addition & 0 deletions ddtrace/settings/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
"aiomysql",
"pyramid",
"dbapi2",
"duckdb",
"vertexai",
"cherrypy",
"flask_cache",
Expand Down
7 changes: 7 additions & 0 deletions docs/integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ Dramatiq
.. automodule:: ddtrace.contrib.internal.dramatiq


.. _duckdb:

DuckDB
^^^^^^
.. automodule:: ddtrace.contrib.internal.duckdb


.. _elasticsearch:

Elasticsearch
Expand Down
4 changes: 4 additions & 0 deletions releasenotes/notes/duckdb-integration-36fa2ff7c771883c.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Add DuckDB integration support with comprehensive tracing capabilities including query execution monitoring, performance metrics, and error handling for DuckDB ≥ 0.8.0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.8.0 doesn't match with the supported versions in patch.py

10 changes: 10 additions & 0 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,16 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT
),
],
),
Venv(
name="duckdb",
command="pytest {cmdargs} tests/contrib/duckdb",
venvs=[
Venv(
pys=select_pys(min_version="3.9", max_version="3.13"),
pkgs={"duckdb": latest, "pytest": latest},
),
],
),
Venv(
name="elasticsearch",
command="pytest {cmdargs} tests/contrib/elasticsearch/test_elasticsearch.py",
Expand Down
Loading
Loading