diff --git a/src/docs2vecs/subcommands/indexer/config/config_schema.yaml b/src/docs2vecs/subcommands/indexer/config/config_schema.yaml index a177b6c..13c4c5b 100644 --- a/src/docs2vecs/subcommands/indexer/config/config_schema.yaml +++ b/src/docs2vecs/subcommands/indexer/config/config_schema.yaml @@ -10,7 +10,7 @@ definitions: schema: type: type: string - allowed: ['exporter', 'embedding', 'vector-store', 'uploader', 'splitter', 'integrated_vec', 'file-scanner', 'file-reader', 'loader'] + allowed: ['exporter', 'embedding', 'vector-store', 'uploader', 'splitter', 'integrated_vec', 'file-scanner', 'file-reader', 'loader', 'writer'] required: True name: type: string @@ -105,6 +105,10 @@ definitions: type: integer required: False min: 0 + # JSONWriterSkill params + output_path: + type: string + required: False # ConfluenceFAQSplitter params min_heading_level: type: integer diff --git a/src/docs2vecs/subcommands/indexer/skills/__init__.py b/src/docs2vecs/subcommands/indexer/skills/__init__.py index 7cbd07a..1305ca3 100644 --- a/src/docs2vecs/subcommands/indexer/skills/__init__.py +++ b/src/docs2vecs/subcommands/indexer/skills/__init__.py @@ -15,6 +15,7 @@ from .faiss_vector_store_skill import FaissVectorStoreSkill from .teams_qna_loader_skill import TeamsQnALoaderSkill from .confluence_faq_splitter_skill import ConfluenceFAQSplitter +from .json_writer_skill import JSONWriterSkill __all__ = [ @@ -35,4 +36,5 @@ "FaissVectorStoreSkill", "TeamsQnALoaderSkill", "ConfluenceFAQSplitter", + "JSONWriterSkill", ] diff --git a/src/docs2vecs/subcommands/indexer/skills/factory.py b/src/docs2vecs/subcommands/indexer/skills/factory.py index 3747519..71d80cd 100644 --- a/src/docs2vecs/subcommands/indexer/skills/factory.py +++ b/src/docs2vecs/subcommands/indexer/skills/factory.py @@ -18,6 +18,7 @@ from docs2vecs.subcommands.indexer.skills import FaissVectorStoreSkill from docs2vecs.subcommands.indexer.skills.confluence_faq_splitter_skill import ConfluenceFAQSplitter from docs2vecs.subcommands.indexer.skills.teams_qna_loader_skill import TeamsQnALoaderSkill +from docs2vecs.subcommands.indexer.skills.json_writer_skill import JSONWriterSkill class SkillType(StrEnum): @@ -29,6 +30,7 @@ class SkillType(StrEnum): UPLOADER = "uploader" SPLITTER = "splitter" LOADER = "loader" + WRITER = "writer" class AvailableSkillName(StrEnum): @@ -63,6 +65,9 @@ class AvailableSkillName(StrEnum): JIRA_LOADER = "jira-loader" TEAMS_QNA_LOADER = "teams-qna-loader" + # writers + JSON_WRITER = "json-writer" + AVAILABLE_SKILLS = { SkillType.EXPORTER: { @@ -92,6 +97,9 @@ class AvailableSkillName(StrEnum): AvailableSkillName.JIRA_LOADER: JiraLoaderSkill, AvailableSkillName.TEAMS_QNA_LOADER: TeamsQnALoaderSkill, }, + SkillType.WRITER: { + AvailableSkillName.JSON_WRITER: JSONWriterSkill, + }, } diff --git a/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py b/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py new file mode 100644 index 0000000..d077da5 --- /dev/null +++ b/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py @@ -0,0 +1,65 @@ +"""Skill that extracts chunk content from Documents and writes it to a JSON file. + +Use this skill at any point in a pipeline to capture intermediate state, +e.g. after a splitter, so the output can be checksummed for change detection +without running expensive downstream skills like embedding and indexing. + +Only the chunk text content is written as a sorted JSON array of strings — +volatile metadata like filenames, document IDs, and timestamps are excluded +so the checksum remains stable when the underlying text hasn't changed. +""" + +import json +import os +from typing import List, Optional + +from docs2vecs.subcommands.indexer.config.config import Config +from docs2vecs.subcommands.indexer.document import Document +from docs2vecs.subcommands.indexer.skills.skill import IndexerSkill + + +class JSONWriterSkill(IndexerSkill): + """Extract text content from all chunks and write it as a sorted JSON array. + + The output is a flat list of strings (one per non-empty chunk), sorted + alphabetically for deterministic checksumming. Documents are passed + through unchanged for downstream skills. + + Config params: + output_path (str): Path to the output JSON file (default: + ``data/pipeline_output.json``). Parent + directories are created automatically. + """ + + def __init__(self, skill_config: dict, global_config: Config) -> None: + super().__init__(skill_config, global_config) + self._output_path = self._config.get("output_path", "data/pipeline_output.json") + + def run(self, input: Optional[List[Document]] = None) -> List[Document]: + if not input: + self.logger.warning("JSONWriterSkill received no input — nothing to write.") + return input or [] + + # Collect only the content from every chunk across all documents + contents = [] + for doc in input: + for chunk in doc.chunks: + if chunk.content: + contents.append(chunk.content) + + # Sort for deterministic output (stable checksums) + contents.sort() + + os.makedirs(os.path.dirname(self._output_path) or ".", exist_ok=True) + + with open(self._output_path, "w", encoding="utf-8") as f: + json.dump(contents, f, indent=2, ensure_ascii=False) + + self.logger.info( + "Wrote %d chunk content entries to %s", + len(contents), + self._output_path, + ) + + # Pass-through: downstream skills can still consume the documents + return input