From ce14fc978b82e6d94d2b24be14080877995192b2 Mon Sep 17 00:00:00 2001 From: martipath Date: Tue, 3 Mar 2026 14:13:17 +0530 Subject: [PATCH 1/2] feat: per-page tagging, JSON writer change gate, and schema additions - scrollwordexporter: add per-URL tag overrides via page_url_tags/default_tag - json_writer: add SHA-256 checksum gate to skip unchanged downstream processing - teams_qna_loader: support per-item tag override from JSON - config_schema: add default_tag, page_url_tags, checksum_path, skip_downstream_if_unchanged, batch_size(exitsing parameter from vector skill) - docs: update indexer-skills.md accordingly --- docs/readme/indexer-skills.md | 54 ++++++++++++-- .../indexer/config/config_schema.yaml | 32 +++++++- .../indexer/skills/json_writer_skill.py | 74 ++++++++++++++++++- .../skills/scrollwordexporter_skill.py | 57 ++++++++++---- .../indexer/skills/teams_qna_loader_skill.py | 2 +- 5 files changed, 190 insertions(+), 29 deletions(-) diff --git a/docs/readme/indexer-skills.md b/docs/readme/indexer-skills.md index f72151e..c497490 100644 --- a/docs/readme/indexer-skills.md +++ b/docs/readme/indexer-skills.md @@ -40,6 +40,15 @@ This document describes all available skills that can be used in the indexer pip 2. An `embedding` to generate embeddings from the Q&A content. 3. A `vector-store` to store the embeddings. +6. You want to avoid re-running expensive embedding and indexing when the content hasn't changed since the last run? Insert a `writer` (`json-writer`) skill as a change gate: + + 1. A `file-scanner` (or `exporter`) to locate/export your source documents. + 2. A `file-reader` to read their content. + 3. A `splitter` to split the documents into chunks. + 4. A `writer` (`json-writer`) with `checksum_path` set — it will compare a SHA-256 checksum of the chunked content against the previous run and, if unchanged, strip chunks so downstream skills are skipped automatically. + 5. An `embedding` to generate embeddings (skipped when content is unchanged). + 6. A `vector-store` to store the embeddings (skipped when content is unchanged). + # Available Skills @@ -47,7 +56,7 @@ This document describes all available skills that can be used in the indexer pip Export data from one source to another. For example export a confluence page to a markdown file. ### Scroll Word Exporter -Exports a confluence page to Microsoft Word document +Exports Confluence pages to Microsoft Word documents. Each entry in `page_urls` and `page_ids` supports an optional inline `tag`. Entries without a tag fall back to the top-level `tag` param. ```yaml - skill: &Exporter @@ -58,12 +67,19 @@ Exports a confluence page to Microsoft Word document auth_token: env.SWE_AUTH_TOKEN # Scroll Word API token - can be obtained in Confluence poll_interval: 20 # Interval in seconds to check the status of the export export_folder: ~/Downloads/sw_export_temp # Path where the exported file(s) should be saved - scope: current # Possible values: [current | descendants]. `current` exports just the current page, where `descendants` include all the descendants of the current page - page_ids: # List all page IDs that you'd like to export - - 1774209540 - page_urls: # List all page URLs that you'd like to export - - https://your/corporate/confluence/prefix/wiki/spaces/your/confluence/space - confluence_prefix: https://your/corporate/confluence/prefix # Your corporate Confluence URL + scope: current # Possible values: [current | descendants] + confluence_prefix: https://your/corporate/confluence/prefix + tag: generic # Optional: default tag for all pages (fallback) + page_urls: + - url: https://your/confluence/spaces/SPACE/pages/123/Page+Title + tag: my-tag # Optional: overrides top-level tag for this page + - url: https://your/confluence/spaces/SPACE/pages/456/Another+Page + # no tag — falls back to top-level tag + page_ids: + - id: 1774209540 + tag: my-tag # Optional + - id: 1234567890 + # no tag — falls back to top-level tag ``` @@ -136,13 +152,15 @@ Loads data from Jira issues ### Teams Q&A Loader Loads enriched Q&A pairs from a JSON file produced by the FAQ enrichment pipeline. Each Q&A pair becomes a single document with one chunk. The skill prefers rephrased questions/answers when available, falling back to originals. +Each Q&A object in the JSON can optionally include a `tag` field that overrides the skill-level `tag` for that specific chunk, allowing fine-grained tagging within a single file. + ```yaml - skill: &TeamsQnALoader type: loader name: teams-qna-loader params: file_path: data/processed_output/enriched_qna.json # Required: path to enriched Q&A JSON file - tag: teams-faq # Optional: tag for chunks (default: "enriched-qna") + tag: teams-faq # Optional: default tag for chunks (default: "enriched-qna"); can be overridden per Q&A object via a "tag" field in the JSON ``` @@ -201,6 +219,25 @@ All parameters are optional with sensible defaults. ``` +
Writer Skills +Capture and optionally gate intermediate pipeline state to a file. + +### JSON Writer +Extracts text content from all chunks and writes it as a sorted JSON array to a file. Useful for inspecting intermediate pipeline state (e.g. after splitting) and as a **change-detection gate**: when `checksum_path` is configured, the skill computes a SHA-256 checksum of the output and compares it to the previous run. If the content is unchanged and `skip_downstream_if_unchanged` is `true`, all chunks are stripped from the documents so downstream embedding and indexing skills are automatically skipped. + +Documents and their chunks are always passed through for downstream skills — unless the change gate fires. + +```yaml +- skill: &JSONWriter + type: writer + name: json-writer + params: + output_path: data/pipeline_output.json # Path to the output JSON file (default: "data/pipeline_output.json") + checksum_path: data/pipeline_output.sha256 # Optional: path to store/read a SHA-256 checksum. Enables change detection. + skip_downstream_if_unchanged: true # Optional: if true (default) and checksum_path is set, strips chunks when content unchanged, skipping downstream embedding/indexing +``` +
+
Embedding Generate embeddings from text. Embeddings is a vector representation of your text data. @@ -250,6 +287,7 @@ Stores embeddings in an Azure AI Search index. document_name: document_name embedding: embedding overwrite_index: true # true - before storing data, it will remove all the documents from your index. false - will append documents to your index + batch_size: 50 # Optional: number of documents uploaded per API call (default: 50, max: 50) ``` ### Chroma diff --git a/src/docs2vecs/subcommands/indexer/config/config_schema.yaml b/src/docs2vecs/subcommands/indexer/config/config_schema.yaml index 13c4c5b..ede449f 100644 --- a/src/docs2vecs/subcommands/indexer/config/config_schema.yaml +++ b/src/docs2vecs/subcommands/indexer/config/config_schema.yaml @@ -36,13 +36,29 @@ definitions: required: False page_ids: type: list + required: False schema: - type: ['string', 'integer'] + type: dict + schema: + id: + type: ['string', 'integer'] + required: True + tag: + type: string + required: False page_urls: type: list + required: False schema: - type: string - regex: '^http.*' + type: dict + schema: + url: + type: string + regex: '^http.*' + required: True + tag: + type: string + required: False confluence_prefix: type: string regex: '^http.*' @@ -109,6 +125,12 @@ definitions: output_path: type: string required: False + checksum_path: + type: string + required: False + skip_downstream_if_unchanged: + type: boolean + required: False # ConfluenceFAQSplitter params min_heading_level: type: integer @@ -183,6 +205,10 @@ definitions: required: False overwrite_index: type: boolean + batch_size: + type: integer + required: False + min: 1 jql_query: type: string required: False diff --git a/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py b/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py index d077da5..1088d96 100644 --- a/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py +++ b/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py @@ -7,8 +7,15 @@ Only the chunk text content is written as a sorted JSON array of strings — volatile metadata like filenames, document IDs, and timestamps are excluded so the checksum remains stable when the underlying text hasn't changed. + +When ``checksum_path`` is configured, the skill compares the current content +hash against a previously stored one. If unchanged and +``skip_downstream_if_unchanged`` is true, all chunks are removed from the +documents so downstream skills (embedding, indexing) naturally skip +processing — enabling a single-config pipeline with a built-in change gate. """ +import hashlib import json import os from typing import List, Optional @@ -23,17 +30,49 @@ class JSONWriterSkill(IndexerSkill): The output is a flat list of strings (one per non-empty chunk), sorted alphabetically for deterministic checksumming. Documents are passed - through unchanged for downstream skills. + through unchanged for downstream skills — unless ``checksum_path`` is + set and the content hasn't changed, in which case chunks are stripped + so downstream embedding/indexing skills skip processing. Config params: output_path (str): Path to the output JSON file (default: ``data/pipeline_output.json``). Parent directories are created automatically. + checksum_path (str, optional): Path to store/read a SHA-256 + checksum of the JSON output. When set, the + skill compares the current checksum against the + stored one to detect content changes. + skip_downstream_if_unchanged (bool, optional): If true (default) + and ``checksum_path`` is set, remove all chunks + from documents when content is unchanged. This + causes downstream skills (embedding, indexing) + to skip processing. Set to false to always pass + chunks through regardless of change detection. """ def __init__(self, skill_config: dict, global_config: Config) -> None: super().__init__(skill_config, global_config) self._output_path = self._config.get("output_path", "data/pipeline_output.json") + self._checksum_path = self._config.get("checksum_path", None) + self._skip_if_unchanged = self._config.get("skip_downstream_if_unchanged", True) + + def _compute_checksum(self, content_bytes: bytes) -> str: + return hashlib.sha256(content_bytes).hexdigest() + + def _read_stored_checksum(self) -> Optional[str]: + if self._checksum_path and os.path.isfile(self._checksum_path): + try: + with open(self._checksum_path, "r", encoding="utf-8") as f: + return f.read().strip() + except Exception as e: + self.logger.warning(f"Failed to read stored checksum: {e}") + return None + + def _write_checksum(self, checksum: str) -> None: + if self._checksum_path: + os.makedirs(os.path.dirname(self._checksum_path) or ".", exist_ok=True) + with open(self._checksum_path, "w", encoding="utf-8") as f: + f.write(checksum) def run(self, input: Optional[List[Document]] = None) -> List[Document]: if not input: @@ -52,8 +91,10 @@ def run(self, input: Optional[List[Document]] = None) -> List[Document]: os.makedirs(os.path.dirname(self._output_path) or ".", exist_ok=True) - with open(self._output_path, "w", encoding="utf-8") as f: - json.dump(contents, f, indent=2, ensure_ascii=False) + json_bytes = json.dumps(contents, indent=2, ensure_ascii=False).encode("utf-8") + + with open(self._output_path, "wb") as f: + f.write(json_bytes) self.logger.info( "Wrote %d chunk content entries to %s", @@ -61,5 +102,32 @@ def run(self, input: Optional[List[Document]] = None) -> List[Document]: self._output_path, ) + # ── Checksum-based change gate ────────────────────────── + if self._checksum_path: + new_checksum = self._compute_checksum(json_bytes) + old_checksum = self._read_stored_checksum() + + if old_checksum and new_checksum == old_checksum and self._skip_if_unchanged: + self.logger.info( + "Content unchanged (checksum: %s) — stripping chunks to skip downstream processing.", + new_checksum[:12], + ) + for doc in input: + doc.chunks = set() + else: + if old_checksum: + self.logger.info( + "Content changed (old: %s, new: %s) — passing chunks to downstream skills.", + old_checksum[:12], + new_checksum[:12], + ) + else: + self.logger.info( + "No previous checksum found — passing chunks to downstream skills (first run).", + ) + + # Always save the new checksum + self._write_checksum(new_checksum) + # Pass-through: downstream skills can still consume the documents return input diff --git a/src/docs2vecs/subcommands/indexer/skills/scrollwordexporter_skill.py b/src/docs2vecs/subcommands/indexer/skills/scrollwordexporter_skill.py index fc9e9ec..a9bc2ca 100644 --- a/src/docs2vecs/subcommands/indexer/skills/scrollwordexporter_skill.py +++ b/src/docs2vecs/subcommands/indexer/skills/scrollwordexporter_skill.py @@ -11,6 +11,18 @@ class ScrollWorldExporterSkill(IndexerSkill): + """Export Confluence pages as DOCX via Scroll Word Exporter API. + + Each entry in ``page_urls`` and ``page_ids`` can carry an optional ``tag`` + field. Entries without a tag fall back to the top-level ``tag`` param + (default: ``""``). + + Config params: + page_urls (list): List of dicts with ``url`` (required) and ``tag`` (optional). + page_ids (list): List of dicts with ``id`` (required) and ``tag`` (optional). + tag (str, optional): Default fallback tag for entries without an explicit tag. + """ + def __init__(self, config: dict, global_config: Config) -> None: super().__init__(config, global_config) self._auth_header = f"Bearer {self._config['auth_token']}" @@ -18,6 +30,7 @@ def __init__(self, config: dict, global_config: Config) -> None: self._export_folder = Path(self._config["export_folder"]).expanduser().resolve() self._confluence_prefix = self._config["confluence_prefix"] self._confluence_prefix = "https://amadeus.atlassian.net/wiki" + self._default_tag: str = self._config.get("tag", "") def _start_export(self, page_id: str, api_url: str, auth_header: str) -> str: EXPORT_PARAMETERS = { @@ -84,24 +97,41 @@ def _extract_page_id_from_url(self, url: str) -> str: page_id = tokens[tokens.index("pages") + 1] return page_id - def _extract_confluence_page_ids(self) -> List[str]: - cp_id_list = [] + def _extract_confluence_page_entries(self) -> List[dict]: + """Return a list of dicts with 'page_id', 'url' (if available), and 'tag'.""" + entries = [] + if self._config.get("page_ids"): - cp_id_list += self._config["page_ids"] + for entry in self._config["page_ids"]: + entries.append({ + "page_id": str(entry["id"]), + "url": None, + "tag": entry.get("tag", self._default_tag), + }) if self._config.get("page_urls"): - for page_url in self._config["page_urls"]: - cp_id_list.append(self._extract_page_id_from_url(page_url)) + for entry in self._config["page_urls"]: + entries.append({ + "page_id": self._extract_page_id_from_url(entry["url"]), + "url": entry["url"], + "tag": entry.get("tag", self._default_tag), + }) + + if not entries: + self.logger.warning("No pages to export — both 'page_ids' and 'page_urls' are empty or missing.") - return cp_id_list + return entries def run(self, input: Optional[List[Document]] = None) -> List[Document]: self.logger.info("Running ScrollWorldExporter") doc_list: List[Document] = [] - page_id_to_export = self._extract_confluence_page_ids() - for page_id in page_id_to_export: - self.logger.debug(f"Exporting confluence page: {page_id}") + page_entries = self._extract_confluence_page_entries() + + for entry in page_entries: + page_id = entry["page_id"] + tag = entry["tag"] + self.logger.debug(f"Exporting confluence page: {page_id} (tag={tag})") export_job_id: str = self._start_export( page_id, self._api_url, self._auth_header ) @@ -123,10 +153,9 @@ def run(self, input: Optional[List[Document]] = None) -> List[Document]: source_url = ( f"{self._confluence_prefix}/pages/viewpage.action?pageId={page_id}" ) - doc_list.append( - Document( - filename=self._download_file(download_url), source_url=source_url - ) - ) + filename = self._download_file(download_url) + + doc = Document(filename=filename, source_url=source_url, tag=tag) + doc_list.append(doc) return doc_list diff --git a/src/docs2vecs/subcommands/indexer/skills/teams_qna_loader_skill.py b/src/docs2vecs/subcommands/indexer/skills/teams_qna_loader_skill.py index b522506..c8468b9 100644 --- a/src/docs2vecs/subcommands/indexer/skills/teams_qna_loader_skill.py +++ b/src/docs2vecs/subcommands/indexer/skills/teams_qna_loader_skill.py @@ -88,7 +88,7 @@ def run(self, documents: Optional[List[Document]]) -> List[Document]: chunk = Chunk() chunk.document_id = document_id chunk.document_name = f"{topic} - FAQ" - chunk.tag = self.tag + chunk.tag = qna.get("tag", self.tag) chunk.content = content chunk.chunk_id = f"{document_id}_chunk_0" chunk.source_link = source_url From 223ec0f2bc085ce8e248d09843c8ac4723b68567 Mon Sep 17 00:00:00 2001 From: martipath Date: Tue, 3 Mar 2026 23:13:38 +0530 Subject: [PATCH 2/2] refactor: per-chunk change detection and stable document IDs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - json-writer: checksum per chunk (document_id → sha256(content)), not whole pipeline - confluence-faq-splitter: document_id from question only, stable across answer edits - indexer-skills.md: updated to reflect above points --- docs/readme/indexer-skills.md | 14 +- .../skills/confluence_faq_splitter_skill.py | 6 +- .../indexer/skills/json_writer_skill.py | 152 ++++++++++-------- .../indexer/skills/teams_qna_loader_skill.py | 20 +-- 4 files changed, 106 insertions(+), 86 deletions(-) diff --git a/docs/readme/indexer-skills.md b/docs/readme/indexer-skills.md index c497490..4d53c56 100644 --- a/docs/readme/indexer-skills.md +++ b/docs/readme/indexer-skills.md @@ -45,7 +45,7 @@ This document describes all available skills that can be used in the indexer pip 1. A `file-scanner` (or `exporter`) to locate/export your source documents. 2. A `file-reader` to read their content. 3. A `splitter` to split the documents into chunks. - 4. A `writer` (`json-writer`) with `checksum_path` set — it will compare a SHA-256 checksum of the chunked content against the previous run and, if unchanged, strip chunks so downstream skills are skipped automatically. + 4. A `writer` (`json-writer`) with `checksum_path` set — it computes a SHA-256 checksum of each **chunk's content** individually (keyed by `document_id`); only chunks whose content has changed (or are new) pass downstream, so unchanged chunks are stripped and their embedding and indexing are skipped automatically. 5. An `embedding` to generate embeddings (skipped when content is unchanged). 6. A `vector-store` to store the embeddings (skipped when content is unchanged). @@ -198,6 +198,8 @@ Splits text by grouping semantically equivalent chunks together. A bit more adva ### Confluence FAQ Splitter Extracts Q&A pairs directly from FAQ `.docx` files exported from Confluence. Each heading that contains a `?` or starts with a problem/question pattern (e.g. "How do I", "I cannot") is treated as a question, and the body content below it becomes the answer. Each Q&A pair is produced as a single atomic chunk. No `file-reader` is needed — this skill reads `.docx` files directly via `python-docx`. +Each chunk's `document_id` is a SHA-256 hash of the **question text only**, so the ID stays stable even when the answer is updated. This makes it a reliable unique key for Azure AI Search upserts — changed Q&A pairs are re-indexed in place without creating duplicates and pairs whose answers haven't changed are skipped by the `json-writer` change gate. + All parameters are optional with sensible defaults. ```yaml @@ -223,18 +225,18 @@ All parameters are optional with sensible defaults. Capture and optionally gate intermediate pipeline state to a file. ### JSON Writer -Extracts text content from all chunks and writes it as a sorted JSON array to a file. Useful for inspecting intermediate pipeline state (e.g. after splitting) and as a **change-detection gate**: when `checksum_path` is configured, the skill computes a SHA-256 checksum of the output and compares it to the previous run. If the content is unchanged and `skip_downstream_if_unchanged` is `true`, all chunks are stripped from the documents so downstream embedding and indexing skills are automatically skipped. +Extracts text content from all chunks and writes it as a sorted JSON array to a file. Useful for inspecting intermediate pipeline state (e.g. after splitting) and as a **per-chunk change-detection gate**: when `checksum_path` is configured, the skill computes a SHA-256 checksum of each **chunk's content** individually and stores the results in a JSON map keyed by `document_id`. On subsequent runs, only chunks whose content has changed (or are new) are passed downstream — unchanged chunks are stripped from their documents, so embedding and indexing are skipped for those chunks only. -Documents and their chunks are always passed through for downstream skills — unless the change gate fires. +This works well with Azure AI Search's key-based upsert — changed documents are re-indexed in place without creating duplicates. ```yaml - skill: &JSONWriter type: writer name: json-writer params: - output_path: data/pipeline_output.json # Path to the output JSON file (default: "data/pipeline_output.json") - checksum_path: data/pipeline_output.sha256 # Optional: path to store/read a SHA-256 checksum. Enables change detection. - skip_downstream_if_unchanged: true # Optional: if true (default) and checksum_path is set, strips chunks when content unchanged, skipping downstream embedding/indexing + output_path: data/pipeline_output.json # Path to the combined output JSON file (default: "data/pipeline_output.json") + checksum_path: data/checksums.json # Optional: path to a JSON file storing per-chunk SHA-256 checksums keyed by document_id. Enables per-chunk change detection. + skip_downstream_if_unchanged: true # Optional: if true (default) and checksum_path is set, strips unchanged chunks from their documents, skipping their embedding/indexing ```
diff --git a/src/docs2vecs/subcommands/indexer/skills/confluence_faq_splitter_skill.py b/src/docs2vecs/subcommands/indexer/skills/confluence_faq_splitter_skill.py index a1623d9..4f84be3 100644 --- a/src/docs2vecs/subcommands/indexer/skills/confluence_faq_splitter_skill.py +++ b/src/docs2vecs/subcommands/indexer/skills/confluence_faq_splitter_skill.py @@ -122,7 +122,11 @@ def run(self, input: Optional[List[Document]] = None) -> List[Document]: combined_text = f"Q: {question}\n\nA: {answer}{links_text}" chunk = Chunk() - chunk.document_id = hashlib.sha256(combined_text.encode()).hexdigest() + # Hash document_id from question only — the question is the + # stable identity of a Q&A pair, so the ID stays the same + # even when the answer is updated. This makes it a reliable + # unique key for Azure AI Search upserts. + chunk.document_id = hashlib.sha256(question.encode()).hexdigest() chunk.document_name = Path(doc.filename).name chunk.tag = doc.tag chunk.content = combined_text # Full Q&A for retrieval diff --git a/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py b/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py index 1088d96..ae4af80 100644 --- a/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py +++ b/src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py @@ -1,18 +1,9 @@ -"""Skill that extracts chunk content from Documents and writes it to a JSON file. +"""Writes chunk content to a JSON file with optional per-document change detection. -Use this skill at any point in a pipeline to capture intermediate state, -e.g. after a splitter, so the output can be checksummed for change detection -without running expensive downstream skills like embedding and indexing. - -Only the chunk text content is written as a sorted JSON array of strings — -volatile metadata like filenames, document IDs, and timestamps are excluded -so the checksum remains stable when the underlying text hasn't changed. - -When ``checksum_path`` is configured, the skill compares the current content -hash against a previously stored one. If unchanged and -``skip_downstream_if_unchanged`` is true, all chunks are removed from the -documents so downstream skills (embedding, indexing) naturally skip -processing — enabling a single-config pipeline with a built-in change gate. +Outputs a sorted JSON array of chunk text strings (metadata excluded). +When ``checksum_path`` is set, per-chunk SHA-256 checksums (keyed by +``document_id``) gate downstream processing — only changed or new chunks +are kept; unchanged chunks are stripped from their documents. """ import hashlib @@ -26,28 +17,14 @@ class JSONWriterSkill(IndexerSkill): - """Extract text content from all chunks and write it as a sorted JSON array. - - The output is a flat list of strings (one per non-empty chunk), sorted - alphabetically for deterministic checksumming. Documents are passed - through unchanged for downstream skills — unless ``checksum_path`` is - set and the content hasn't changed, in which case chunks are stripped - so downstream embedding/indexing skills skip processing. + """Write chunk text as a sorted JSON array with per-chunk change gating. Config params: - output_path (str): Path to the output JSON file (default: - ``data/pipeline_output.json``). Parent - directories are created automatically. - checksum_path (str, optional): Path to store/read a SHA-256 - checksum of the JSON output. When set, the - skill compares the current checksum against the - stored one to detect content changes. - skip_downstream_if_unchanged (bool, optional): If true (default) - and ``checksum_path`` is set, remove all chunks - from documents when content is unchanged. This - causes downstream skills (embedding, indexing) - to skip processing. Set to false to always pass - chunks through regardless of change detection. + output_path (str): Output JSON path (default: ``data/pipeline_output.json``). + checksum_path (str, optional): JSON file for per-chunk SHA-256 checksums + keyed by ``document_id``. + skip_downstream_if_unchanged (bool, optional): Strip unchanged chunks + so downstream skills skip them (default: true). """ def __init__(self, skill_config: dict, global_config: Config) -> None: @@ -59,35 +36,47 @@ def __init__(self, skill_config: dict, global_config: Config) -> None: def _compute_checksum(self, content_bytes: bytes) -> str: return hashlib.sha256(content_bytes).hexdigest() - def _read_stored_checksum(self) -> Optional[str]: + def _read_stored_checksums(self) -> dict: + """Return stored {document_id: checksum} map, or empty dict.""" if self._checksum_path and os.path.isfile(self._checksum_path): try: with open(self._checksum_path, "r", encoding="utf-8") as f: - return f.read().strip() + data = json.load(f) + if isinstance(data, dict): + return data + # Legacy format — cannot migrate, start fresh. + self.logger.warning( + "Checksum file contains legacy format — starting fresh." + ) except Exception as e: - self.logger.warning(f"Failed to read stored checksum: {e}") - return None + self.logger.warning(f"Failed to read stored checksums: {e}") + return {} - def _write_checksum(self, checksum: str) -> None: + def _write_checksums(self, checksums: dict) -> None: + """Save per-document checksums to disk.""" if self._checksum_path: os.makedirs(os.path.dirname(self._checksum_path) or ".", exist_ok=True) with open(self._checksum_path, "w", encoding="utf-8") as f: - f.write(checksum) + json.dump(checksums, f, indent=2, ensure_ascii=False) + + def _compute_chunk_checksum(self, chunk) -> str: + """SHA-256 checksum of a single chunk's content.""" + payload = (chunk.content or "").encode("utf-8") + return self._compute_checksum(payload) def run(self, input: Optional[List[Document]] = None) -> List[Document]: if not input: self.logger.warning("JSONWriterSkill received no input — nothing to write.") return input or [] - # Collect only the content from every chunk across all documents + # Collect chunk content across all documents contents = [] for doc in input: for chunk in doc.chunks: if chunk.content: contents.append(chunk.content) - # Sort for deterministic output (stable checksums) - contents.sort() + contents.sort() # deterministic order for stable checksums os.makedirs(os.path.dirname(self._output_path) or ".", exist_ok=True) @@ -102,32 +91,57 @@ def run(self, input: Optional[List[Document]] = None) -> List[Document]: self._output_path, ) - # ── Checksum-based change gate ────────────────────────── + # ── Per-chunk checksum-based change gate ──────────────── + # Each chunk is keyed by its document_id (e.g. question hash). + # Only chunks whose content has changed (or are new) are kept; + # unchanged chunks are removed so downstream skills skip them. if self._checksum_path: - new_checksum = self._compute_checksum(json_bytes) - old_checksum = self._read_stored_checksum() - - if old_checksum and new_checksum == old_checksum and self._skip_if_unchanged: - self.logger.info( - "Content unchanged (checksum: %s) — stripping chunks to skip downstream processing.", - new_checksum[:12], - ) - for doc in input: - doc.chunks = set() - else: - if old_checksum: - self.logger.info( - "Content changed (old: %s, new: %s) — passing chunks to downstream skills.", - old_checksum[:12], - new_checksum[:12], - ) - else: - self.logger.info( - "No previous checksum found — passing chunks to downstream skills (first run).", - ) - - # Always save the new checksum - self._write_checksum(new_checksum) + old_checksums = self._read_stored_checksums() + new_checksums: dict = {} + + changed_count = 0 + unchanged_count = 0 + + for doc in input: + unchanged_chunks = set() + + for chunk in doc.chunks: + doc_id = chunk.document_id or chunk.chunk_id or "unknown" + chunk_checksum = self._compute_chunk_checksum(chunk) + new_checksums[doc_id] = chunk_checksum + + old_checksum = old_checksums.get(doc_id) + + if old_checksum and chunk_checksum == old_checksum and self._skip_if_unchanged: + unchanged_chunks.add(chunk) + unchanged_count += 1 + self.logger.debug( + "Chunk %s unchanged — will be stripped.", + doc_id[:12], + ) + else: + changed_count += 1 + if old_checksum: + self.logger.debug( + "Chunk %s changed (old: %s, new: %s).", + doc_id[:12], + old_checksum[:12], + chunk_checksum[:12], + ) + else: + self.logger.debug("Chunk %s is new.", doc_id[:12]) + + # Remove unchanged chunks from this document + if unchanged_chunks: + doc.chunks -= unchanged_chunks + + self.logger.info( + "Change detection: %d changed/new, %d unchanged out of %d chunks.", + changed_count, + unchanged_count, + changed_count + unchanged_count, + ) + + self._write_checksums(new_checksums) - # Pass-through: downstream skills can still consume the documents return input diff --git a/src/docs2vecs/subcommands/indexer/skills/teams_qna_loader_skill.py b/src/docs2vecs/subcommands/indexer/skills/teams_qna_loader_skill.py index c8468b9..ea69803 100644 --- a/src/docs2vecs/subcommands/indexer/skills/teams_qna_loader_skill.py +++ b/src/docs2vecs/subcommands/indexer/skills/teams_qna_loader_skill.py @@ -12,19 +12,19 @@ class TeamsQnALoaderSkill(IndexerSkill): """A skill that loads enriched Q&A pairs from the FAQ pipeline JSON output. - The JSON file should be an array of enriched Q&A objects with: - - thread_id: Unique identifier for the conversation thread - - question: Original question text - - rephrased_question: AI-polished question (used for embedding) - - rephrased_answer: AI-summarized answer (used as content) - - topic: Clustered topic category - - key_phrases: Extracted key phrases - - question_sender: Original question author - - timestamp: Message timestamp - - answers: Array of original answers + The JSON file should be an array of enriched Q&A objects. Fields consumed by this skill: + - thread_id: Unique identifier for the conversation thread (used as document ID) + - question: Original question text (fallback if rephrased_question is absent) + - rephrased_question: AI-polished question (preferred for content) + - rephrased_answer: AI-summarized answer (preferred for content) + - topic: Clustered topic category (used in document name) + - answers: Array of original answer objects (fallback if rephrased_answer is absent) + - source_link: Teams message deep link (optional, used as chunk source URL) + - tag: Per-item tag override (optional, falls back to skill-level tag) Configuration parameters: - file_path (str): Path to the enriched Q&A JSON file + - tag (str): Default tag for chunks (default: "enriched-qna") """ def __init__(self, skill_config: dict, global_config: Config) -> None: