Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/docs2vecs/subcommands/indexer/config/config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ definitions:
schema:
type:
type: string
allowed: ['exporter', 'embedding', 'vector-store', 'uploader', 'splitter', 'integrated_vec', 'file-scanner', 'file-reader', 'loader']
allowed: ['exporter', 'embedding', 'vector-store', 'uploader', 'splitter', 'integrated_vec', 'file-scanner', 'file-reader', 'loader', 'writer']
required: True
name:
type: string
Expand Down Expand Up @@ -105,6 +105,10 @@ definitions:
type: integer
required: False
min: 0
# JSONWriterSkill params
output_path:
type: string
required: False
# ConfluenceFAQSplitter params
min_heading_level:
type: integer
Expand Down
2 changes: 2 additions & 0 deletions src/docs2vecs/subcommands/indexer/skills/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .faiss_vector_store_skill import FaissVectorStoreSkill
from .teams_qna_loader_skill import TeamsQnALoaderSkill
from .confluence_faq_splitter_skill import ConfluenceFAQSplitter
from .json_writer_skill import JSONWriterSkill


__all__ = [
Expand All @@ -35,4 +36,5 @@
"FaissVectorStoreSkill",
"TeamsQnALoaderSkill",
"ConfluenceFAQSplitter",
"JSONWriterSkill",
]
8 changes: 8 additions & 0 deletions src/docs2vecs/subcommands/indexer/skills/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from docs2vecs.subcommands.indexer.skills import FaissVectorStoreSkill
from docs2vecs.subcommands.indexer.skills.confluence_faq_splitter_skill import ConfluenceFAQSplitter
from docs2vecs.subcommands.indexer.skills.teams_qna_loader_skill import TeamsQnALoaderSkill
from docs2vecs.subcommands.indexer.skills.json_writer_skill import JSONWriterSkill


class SkillType(StrEnum):
Expand All @@ -29,6 +30,7 @@ class SkillType(StrEnum):
UPLOADER = "uploader"
SPLITTER = "splitter"
LOADER = "loader"
WRITER = "writer"


class AvailableSkillName(StrEnum):
Expand Down Expand Up @@ -63,6 +65,9 @@ class AvailableSkillName(StrEnum):
JIRA_LOADER = "jira-loader"
TEAMS_QNA_LOADER = "teams-qna-loader"

# writers
JSON_WRITER = "json-writer"


AVAILABLE_SKILLS = {
SkillType.EXPORTER: {
Expand Down Expand Up @@ -92,6 +97,9 @@ class AvailableSkillName(StrEnum):
AvailableSkillName.JIRA_LOADER: JiraLoaderSkill,
AvailableSkillName.TEAMS_QNA_LOADER: TeamsQnALoaderSkill,
},
SkillType.WRITER: {
AvailableSkillName.JSON_WRITER: JSONWriterSkill,
},
}


Expand Down
65 changes: 65 additions & 0 deletions src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Skill that extracts chunk content from Documents and writes it to a JSON file.

Use this skill at any point in a pipeline to capture intermediate state,
e.g. after a splitter, so the output can be checksummed for change detection
without running expensive downstream skills like embedding and indexing.

Only the chunk text content is written as a sorted JSON array of strings —
volatile metadata like filenames, document IDs, and timestamps are excluded
so the checksum remains stable when the underlying text hasn't changed.
"""

import json
import os
from typing import List, Optional

from docs2vecs.subcommands.indexer.config.config import Config
from docs2vecs.subcommands.indexer.document import Document
from docs2vecs.subcommands.indexer.skills.skill import IndexerSkill


class JSONWriterSkill(IndexerSkill):
"""Extract text content from all chunks and write it as a sorted JSON array.

The output is a flat list of strings (one per non-empty chunk), sorted
alphabetically for deterministic checksumming. Documents are passed
through unchanged for downstream skills.

Config params:
output_path (str): Path to the output JSON file (default:
``data/pipeline_output.json``). Parent
directories are created automatically.
"""

def __init__(self, skill_config: dict, global_config: Config) -> None:
super().__init__(skill_config, global_config)
self._output_path = self._config.get("output_path", "data/pipeline_output.json")

def run(self, input: Optional[List[Document]] = None) -> List[Document]:
if not input:
self.logger.warning("JSONWriterSkill received no input — nothing to write.")
return input or []

# Collect only the content from every chunk across all documents
contents = []
for doc in input:
for chunk in doc.chunks:
if chunk.content:
contents.append(chunk.content)

# Sort for deterministic output (stable checksums)
contents.sort()

os.makedirs(os.path.dirname(self._output_path) or ".", exist_ok=True)

with open(self._output_path, "w", encoding="utf-8") as f:
json.dump(contents, f, indent=2, ensure_ascii=False)

self.logger.info(
"Wrote %d chunk content entries to %s",
len(contents),
self._output_path,
)

# Pass-through: downstream skills can still consume the documents
return input