Skip to content

feat: make dlt resources first class citizens in cognee ingestion#2237

Draft
siillee wants to merge 8 commits intodevfrom
feature/cog-4042-make-dlt-resources-first-class-citizens-in-cognee-ingestion
Draft

feat: make dlt resources first class citizens in cognee ingestion#2237
siillee wants to merge 8 commits intodevfrom
feature/cog-4042-make-dlt-resources-first-class-citizens-in-cognee-ingestion

Conversation

@siillee
Copy link
Contributor

@siillee siillee commented Feb 25, 2026

Description

Added option to ingest dlt (re)sources via regular cognee pipeline. Currently works only for Postgres destination, and only takes schemas into account.

Acceptance Criteria

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Code refactoring
  • Other (please specify):

Screenshots

Pre-submission Checklist

  • I have tested my changes thoroughly before submitting this PR (See CONTRIBUTING.md)
  • This PR contains minimal changes necessary to address the issue/feature
  • My code follows the project's coding standards and style guidelines
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if applicable)
  • All new and existing tests pass
  • I have searched existing PRs to ensure this change hasn't been submitted already
  • I have linked any relevant issues in the description
  • My commits have clear and descriptive messages

DCO Affirmation

I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.

@siillee siillee requested a review from Vasilije1990 February 25, 2026 16:24
@siillee siillee self-assigned this Feb 25, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 25, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/cog-4042-make-dlt-resources-first-class-citizens-in-cognee-ingestion

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pull-checklist
Copy link

Please make sure all the checkboxes are checked:

  • I have tested these changes locally.
  • I have reviewed the code changes.
  • I have added end-to-end and unit tests (if applicable).
  • I have updated the documentation and README.md file (if necessary).
  • I have removed unnecessary code and debug statements.
  • PR title is clear and follows the convention.
  • I have tagged reviewers or team members for feedback.

Andrej Milicevic and others added 2 commits February 27, 2026 11:37
Move all DLT-specific expansion into a pre-processing step that converts
DLT resources into standard DataItem objects. After this, the standard
adapter path handles everything with no DLT branching in add.py,
ingest_data.py, or classify_documents.py.

- Extend DataItem with external_metadata and data_id fields
- Create resolve_dlt_sources.py as the single DLT adapter entry point
- Remove DLT imports/logic from add.py, ingest_data.py, classify_documents.py
- Add generic content-change detection in ingest_data.py

Co-Authored-By: Claude Opus 4.6 <[email protected]>
Signed-off-by: vasilije <[email protected]>
{"db_name": db_name},
)
if exists_result.scalar() is None:
await connection.execute(text(f'CREATE DATABASE "{db_name}";'))

Check failure

Code scanning / CodeQL

SQL query built from user-controlled sources High

This SQL query depends on a
user-provided value
.

Copilot Autofix

AI 1 day ago

In general, when you must use user-controlled values in SQL where parameters cannot be used (e.g., identifiers like database names), you should first validate the value against a strict whitelist of allowed characters and patterns, and reject or normalize anything else. For PostgreSQL database names, a safe approach is to restrict to a subset like [a-zA-Z0-9_] and enforce a reasonable length limit. This avoids dangerous characters such as quotes, semicolons, and backslashes that could break out of the identifier context.

For this codebase, the best fix with minimal behavior change is:

  1. Introduce a small helper function in cognee/tasks/ingestion/ingest_dlt_source.py that validates db_name and raises ValueError if it contains anything other than letters, digits, or underscores, or if it is empty or too long.
  2. Call this validator at the start of _create_pg_database(db_name) before constructing or executing SQL.
  3. Keep using text() for the SQL, but rely on the fact that the validated db_name cannot contain characters that would break out of the quoted identifier or inject additional SQL.

This only touches ingest_dlt_source.py, adding a standard-library import (re) and a small function plus one validation call; the external behavior for normal, valid dataset names remains the same.


Suggested changeset 1
cognee/tasks/ingestion/ingest_dlt_source.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/cognee/tasks/ingestion/ingest_dlt_source.py b/cognee/tasks/ingestion/ingest_dlt_source.py
--- a/cognee/tasks/ingestion/ingest_dlt_source.py
+++ b/cognee/tasks/ingestion/ingest_dlt_source.py
@@ -8,6 +8,7 @@
 )
 from cognee.modules.data.models import Data
 from cognee.infrastructure.databases.relational.config import get_relational_config
+import re
 
 try:
     import dlt
@@ -15,6 +16,28 @@
     pass
 
 
+def _validate_db_name(db_name: str) -> str:
+    """
+    Validate a database name coming from user-controlled input.
+
+    Restrict to alphanumeric characters and underscores to avoid SQL injection
+    when the name is interpolated into identifier positions.
+    """
+    if not isinstance(db_name, str) or not db_name:
+        raise ValueError("Invalid database name.")
+
+    # Allow only letters, digits and underscore; disallow other characters
+    if not re.fullmatch(r"[A-Za-z0-9_]+", db_name):
+        raise ValueError("Database name contains invalid characters.")
+
+    # Optional: enforce a reasonable length limit
+    if len(db_name) > 63:
+        # 63 is PostgreSQL's NAMEDATALEN - 1 default
+        raise ValueError("Database name is too long.")
+
+    return db_name
+
+
 async def ingest_dlt_source(
     dlt_source,
     dataset_name: str,
@@ -83,6 +106,8 @@
 
 
 async def _create_pg_database(db_name):
+    # Validate user-controlled database name before using it in SQL
+    db_name = _validate_db_name(db_name)
     relational_config = get_relational_config()
     maintenance_db_name = "postgres"
     maintenance_db_url = URL.create(
EOF
@@ -8,6 +8,7 @@
)
from cognee.modules.data.models import Data
from cognee.infrastructure.databases.relational.config import get_relational_config
import re

try:
import dlt
@@ -15,6 +16,28 @@
pass


def _validate_db_name(db_name: str) -> str:
"""
Validate a database name coming from user-controlled input.

Restrict to alphanumeric characters and underscores to avoid SQL injection
when the name is interpolated into identifier positions.
"""
if not isinstance(db_name, str) or not db_name:
raise ValueError("Invalid database name.")

# Allow only letters, digits and underscore; disallow other characters
if not re.fullmatch(r"[A-Za-z0-9_]+", db_name):
raise ValueError("Database name contains invalid characters.")

# Optional: enforce a reasonable length limit
if len(db_name) > 63:
# 63 is PostgreSQL's NAMEDATALEN - 1 default
raise ValueError("Database name is too long.")

return db_name


async def ingest_dlt_source(
dlt_source,
dataset_name: str,
@@ -83,6 +106,8 @@


async def _create_pg_database(db_name):
# Validate user-controlled database name before using it in SQL
db_name = _validate_db_name(db_name)
relational_config = get_relational_config()
maintenance_db_name = "postgres"
maintenance_db_url = URL.create(
Copilot is powered by AI and may make mistakes. Always verify output.
Natural language processing (NLP) is an interdisciplinary
subfield of computer science and information retrieval.

Andrej is an expert in NLP.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: Remove this nonsense

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants