feat: add identity_fields to DataPoint for declarative node deduplication#2125
feat: add identity_fields to DataPoint for declarative node deduplication#2125Vasilije1990 wants to merge 1 commit intodevfrom
Conversation
…tion Users defining custom DataPoint subclasses can now specify identity_fields in metadata to auto-generate deterministic UUID5 IDs from field values, eliminating the need for manual dict-based deduplication. Explicit id overrides still take precedence, and existing models without identity_fields remain unaffected (backward compatible). Co-Authored-By: Claude Opus 4.6 <[email protected]> Signed-off-by: vasilije <[email protected]>
Please make sure all the checkboxes are checked:
|
WalkthroughThis change introduces identity-based UUID generation for DataPoint models using deterministic UUID5 hashing. A new optional Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@cognee/infrastructure/engine/models/DataPoint.py`:
- Around line 51-60: The identity ID is being generated before Pydantic applies
defaults; update DataPoint.__init__ signature to include type hints (e.g., def
__init__(self, **data: Any) -> None) and call super().__init__(**data) first,
then compute the identity by calling self.__class__._get_identity_fields() and,
if present, call
self.__class__._generate_identity_id(self.__class__._get_identity_fields(),
self.model_dump(), self.__class__.__name__) (or equivalent) using the defaulted
values from self.model_dump(); if that returns a non-None id and self.id is not
set, assign it to self.id so deterministic IDs are produced.
In `@examples/low_level/pipeline.py`:
- Around line 47-81: Add a docstring to ingest_files explaining its purpose and
inputs, annotate its signature with the correct return type (e.g., ->
List[Company]) and add an explicit type for the local variable all_companies
(e.g., all_companies: List[Company]) so mypy can validate; update imports if
necessary to reference Company/Department/Person types used in the annotations
and keep the implementation unchanged (function name ingest_files, local
variable all_companies).
| def __init__(self, **data): | ||
| if "id" not in data: | ||
| identity_fields = self.__class__._get_identity_fields() | ||
| if identity_fields: | ||
| identity_id = self.__class__._generate_identity_id( | ||
| identity_fields, data, self.__class__.__name__ | ||
| ) | ||
| if identity_id is not None: | ||
| data["id"] = identity_id | ||
|
|
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| def ingest_files(data: List[Data]): | ||
| people_data_points = {} | ||
| departments_data_points = {} | ||
| companies_data_points = {} | ||
| # With identity_fields, DataPoints with the same name automatically get the same UUID. | ||
| # No manual dict-based deduplication needed — just create instances freely. | ||
| all_companies = [] | ||
|
|
||
| for data_item in data: | ||
| people = data_item.payload["people"] | ||
| companies = data_item.payload["companies"] | ||
|
|
||
| # Build departments with their employees | ||
| dept_employees: Dict[str, List[Person]] = {} | ||
| for person in people: | ||
| new_person = Person(name=person["name"]) | ||
| people_data_points[person["name"]] = new_person | ||
| dept_name = person["department"] | ||
| if dept_name not in dept_employees: | ||
| dept_employees[dept_name] = [] | ||
| dept_employees[dept_name].append(Person(name=person["name"])) | ||
|
|
||
| if person["department"] not in departments_data_points: | ||
| departments_data_points[person["department"]] = Department( | ||
| name=person["department"], employees=[new_person] | ||
| ) | ||
| else: | ||
| departments_data_points[person["department"]].employees.append(new_person) | ||
| departments = { | ||
| name: Department(name=name, employees=employees) | ||
| for name, employees in dept_employees.items() | ||
| } | ||
|
|
||
| # Create a single CompanyType node, so we connect all companies to it. | ||
| companyType = CompanyType() | ||
| # Create a single CompanyType node (deterministic ID via identity_fields) | ||
| company_type = CompanyType() | ||
|
|
||
| for company in companies: | ||
| new_company = Company(name=company["name"], departments=[], is_type=companyType) | ||
| companies_data_points[company["name"]] = new_company | ||
|
|
||
| for department_name in company["departments"]: | ||
| if department_name not in departments_data_points: | ||
| departments_data_points[department_name] = Department( | ||
| name=department_name, employees=[] | ||
| ) | ||
|
|
||
| new_company.departments.append(departments_data_points[department_name]) | ||
|
|
||
| return list(companies_data_points.values()) | ||
| company_departments = [ | ||
| departments.get(dept_name, Department(name=dept_name, employees=[])) | ||
| for dept_name in company["departments"] | ||
| ] | ||
| all_companies.append( | ||
| Company(name=company["name"], departments=company_departments, is_type=company_type) | ||
| ) | ||
|
|
||
| return all_companies |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "pipeline.py" | head -20Repository: topoteretes/cognee
Length of output: 144
🏁 Script executed:
cat -n examples/low_level/pipeline.py | head -100Repository: topoteretes/cognee
Length of output: 4037
🏁 Script executed:
rg "^def ingest_files|^class Company|^class Data|^from typing" examples/low_level/pipeline.py -A 2 -B 2Repository: topoteretes/cognee
Length of output: 700
Add return type annotation, variable type annotation, and docstring to ingest_files.
The function lacks a return type annotation, a type hint for the all_companies variable, and a docstring. This violates the project's requirements for type hints (mypy checks enabled) and for documenting function definitions.
✏️ Suggested update
-def ingest_files(data: List[Data]):
+def ingest_files(data: List[Data]) -> List[Company]:
+ """Build Company instances from raw payloads using identity-based deduplication."""
# With identity_fields, DataPoints with the same name automatically get the same UUID.
# No manual dict-based deduplication needed — just create instances freely.
- all_companies = []
+ all_companies: List[Company] = []🤖 Prompt for AI Agents
In `@examples/low_level/pipeline.py` around lines 47 - 81, Add a docstring to
ingest_files explaining its purpose and inputs, annotate its signature with the
correct return type (e.g., -> List[Company]) and add an explicit type for the
local variable all_companies (e.g., all_companies: List[Company]) so mypy can
validate; update imports if necessary to reference Company/Department/Person
types used in the annotations and keep the implementation unchanged (function
name ingest_files, local variable all_companies).
Summary
identity_fieldstoMetaDataTypedDict andDataPoint.__init__, enabling declarative deterministic UUID5 generation from specified field valuesmetadata = {"index_fields": ["name"], "identity_fields": ["name"]}on DataPoint subclasses to automatically deduplicate nodes by nameidoverrides still take precedence; models withoutidentity_fieldsare unaffected (fully backward compatible)Changes
cognee/infrastructure/engine/models/DataPoint.py: Addedidentity_fieldstoMetaData, modified__init__to generate deterministic UUID5 whenidentity_fieldsis set and no explicitidprovided, added_get_identity_fields()and_generate_identity_id()helper methodscognee/tests/unit/infrastructure/engine/test_identity_fields.py: Comprehensive unit tests covering same-value deduplication, cross-type safety, explicit id override, missing field fallback, backward compat, multi-field keys, and string normalizationexamples/low_level/pipeline.py: Updated example models withidentity_fieldsand simplifiedingest_files()to demonstrate declarative deduplication (no more manual dict tracking)Test plan
pytest cognee/tests/unit/infrastructure/engine/test_identity_fields.py -vpytest cognee/tests/unit/python examples/low_level/pipeline.py🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests