Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions docs/readme/indexer-skills.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,23 @@ This document describes all available skills that can be used in the indexer pip
2. An `embedding` to generate embeddings from the Q&A content.
3. A `vector-store` to store the embeddings.

6. You want to avoid re-running expensive embedding and indexing when the content hasn't changed since the last run? Insert a `writer` (`json-writer`) skill as a change gate:

1. A `file-scanner` (or `exporter`) to locate/export your source documents.
2. A `file-reader` to read their content.
3. A `splitter` to split the documents into chunks.
4. A `writer` (`json-writer`) with `checksum_path` set — it computes a SHA-256 checksum of each **chunk's content** individually (keyed by `document_id`); only chunks whose content has changed (or are new) pass downstream, so unchanged chunks are stripped and their embedding and indexing are skipped automatically.
5. An `embedding` to generate embeddings (skipped when content is unchanged).
6. A `vector-store` to store the embeddings (skipped when content is unchanged).


# Available Skills

<details><summary>Exporter Skills</summary>
Export data from one source to another. For example export a confluence page to a markdown file.

### Scroll Word Exporter
Exports a confluence page to Microsoft Word document
Exports Confluence pages to Microsoft Word documents. Each entry in `page_urls` and `page_ids` supports an optional inline `tag`. Entries without a tag fall back to the top-level `tag` param.

```yaml
- skill: &Exporter
Expand All @@ -58,12 +67,19 @@ Exports a confluence page to Microsoft Word document
auth_token: env.SWE_AUTH_TOKEN # Scroll Word API token - can be obtained in Confluence
poll_interval: 20 # Interval in seconds to check the status of the export
export_folder: ~/Downloads/sw_export_temp # Path where the exported file(s) should be saved
scope: current # Possible values: [current | descendants]. `current` exports just the current page, where `descendants` include all the descendants of the current page
page_ids: # List all page IDs that you'd like to export
- 1774209540
page_urls: # List all page URLs that you'd like to export
- https://your/corporate/confluence/prefix/wiki/spaces/your/confluence/space
confluence_prefix: https://your/corporate/confluence/prefix # Your corporate Confluence URL
scope: current # Possible values: [current | descendants]
confluence_prefix: https://your/corporate/confluence/prefix
tag: generic # Optional: default tag for all pages (fallback)
page_urls:
- url: https://your/confluence/spaces/SPACE/pages/123/Page+Title
tag: my-tag # Optional: overrides top-level tag for this page
- url: https://your/confluence/spaces/SPACE/pages/456/Another+Page
# no tag — falls back to top-level tag
page_ids:
- id: 1774209540
tag: my-tag # Optional
- id: 1234567890
# no tag — falls back to top-level tag
```
</details>

Expand Down Expand Up @@ -136,13 +152,15 @@ Loads data from Jira issues
### Teams Q&A Loader
Loads enriched Q&A pairs from a JSON file produced by the FAQ enrichment pipeline. Each Q&A pair becomes a single document with one chunk. The skill prefers rephrased questions/answers when available, falling back to originals.

Each Q&A object in the JSON can optionally include a `tag` field that overrides the skill-level `tag` for that specific chunk, allowing fine-grained tagging within a single file.

```yaml
- skill: &TeamsQnALoader
type: loader
name: teams-qna-loader
params:
file_path: data/processed_output/enriched_qna.json # Required: path to enriched Q&A JSON file
tag: teams-faq # Optional: tag for chunks (default: "enriched-qna")
tag: teams-faq # Optional: default tag for chunks (default: "enriched-qna"); can be overridden per Q&A object via a "tag" field in the JSON
```
</details>

Expand Down Expand Up @@ -180,6 +198,8 @@ Splits text by grouping semantically equivalent chunks together. A bit more adva
### Confluence FAQ Splitter
Extracts Q&A pairs directly from FAQ `.docx` files exported from Confluence. Each heading that contains a `?` or starts with a problem/question pattern (e.g. "How do I", "I cannot") is treated as a question, and the body content below it becomes the answer. Each Q&A pair is produced as a single atomic chunk. No `file-reader` is needed — this skill reads `.docx` files directly via `python-docx`.

Each chunk's `document_id` is a SHA-256 hash of the **question text only**, so the ID stays stable even when the answer is updated. This makes it a reliable unique key for Azure AI Search upserts — changed Q&A pairs are re-indexed in place without creating duplicates and pairs whose answers haven't changed are skipped by the `json-writer` change gate.

All parameters are optional with sensible defaults.

```yaml
Expand All @@ -201,6 +221,25 @@ All parameters are optional with sensible defaults.
```
</details>

<details><summary>Writer Skills</summary>
Capture and optionally gate intermediate pipeline state to a file.

### JSON Writer
Extracts text content from all chunks and writes it as a sorted JSON array to a file. Useful for inspecting intermediate pipeline state (e.g. after splitting) and as a **per-chunk change-detection gate**: when `checksum_path` is configured, the skill computes a SHA-256 checksum of each **chunk's content** individually and stores the results in a JSON map keyed by `document_id`. On subsequent runs, only chunks whose content has changed (or are new) are passed downstream — unchanged chunks are stripped from their documents, so embedding and indexing are skipped for those chunks only.

This works well with Azure AI Search's key-based upsert — changed documents are re-indexed in place without creating duplicates.

```yaml
- skill: &JSONWriter
type: writer
name: json-writer
params:
output_path: data/pipeline_output.json # Path to the combined output JSON file (default: "data/pipeline_output.json")
checksum_path: data/checksums.json # Optional: path to a JSON file storing per-chunk SHA-256 checksums keyed by document_id. Enables per-chunk change detection.
skip_downstream_if_unchanged: true # Optional: if true (default) and checksum_path is set, strips unchanged chunks from their documents, skipping their embedding/indexing
```
</details>

<details><summary>Embedding</summary>
Generate embeddings from text. Embeddings is a vector representation of your text data.

Expand Down Expand Up @@ -250,6 +289,7 @@ Stores embeddings in an Azure AI Search index.
document_name: document_name
embedding: embedding
overwrite_index: true # true - before storing data, it will remove all the documents from your index. false - will append documents to your index
batch_size: 50 # Optional: number of documents uploaded per API call (default: 50, max: 50)
```

### Chroma
Expand Down
32 changes: 29 additions & 3 deletions src/docs2vecs/subcommands/indexer/config/config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,29 @@ definitions:
required: False
page_ids:
type: list
required: False
schema:
type: ['string', 'integer']
type: dict
schema:
id:
type: ['string', 'integer']
required: True
tag:
type: string
required: False
page_urls:
type: list
required: False
schema:
type: string
regex: '^http.*'
type: dict
schema:
url:
type: string
regex: '^http.*'
required: True
tag:
type: string
required: False
confluence_prefix:
type: string
regex: '^http.*'
Expand Down Expand Up @@ -109,6 +125,12 @@ definitions:
output_path:
type: string
required: False
checksum_path:
type: string
required: False
skip_downstream_if_unchanged:
type: boolean
required: False
# ConfluenceFAQSplitter params
min_heading_level:
type: integer
Expand Down Expand Up @@ -183,6 +205,10 @@ definitions:
required: False
overwrite_index:
type: boolean
batch_size:
type: integer
required: False
min: 1
jql_query:
type: string
required: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ def run(self, input: Optional[List[Document]] = None) -> List[Document]:
combined_text = f"Q: {question}\n\nA: {answer}{links_text}"

chunk = Chunk()
chunk.document_id = hashlib.sha256(combined_text.encode()).hexdigest()
# Hash document_id from question only — the question is the
# stable identity of a Q&A pair, so the ID stays the same
# even when the answer is updated. This makes it a reliable
# unique key for Azure AI Search upserts.
chunk.document_id = hashlib.sha256(question.encode()).hexdigest()
chunk.document_name = Path(doc.filename).name
chunk.tag = doc.tag
chunk.content = combined_text # Full Q&A for retrieval
Expand Down
126 changes: 104 additions & 22 deletions src/docs2vecs/subcommands/indexer/skills/json_writer_skill.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
"""Skill that extracts chunk content from Documents and writes it to a JSON file.
"""Writes chunk content to a JSON file with optional per-document change detection.

Use this skill at any point in a pipeline to capture intermediate state,
e.g. after a splitter, so the output can be checksummed for change detection
without running expensive downstream skills like embedding and indexing.

Only the chunk text content is written as a sorted JSON array of strings —
volatile metadata like filenames, document IDs, and timestamps are excluded
so the checksum remains stable when the underlying text hasn't changed.
Outputs a sorted JSON array of chunk text strings (metadata excluded).
When ``checksum_path`` is set, per-chunk SHA-256 checksums (keyed by
``document_id``) gate downstream processing — only changed or new chunks
are kept; unchanged chunks are stripped from their documents.
"""

import hashlib
import json
import os
from typing import List, Optional
Expand All @@ -19,47 +17,131 @@


class JSONWriterSkill(IndexerSkill):
"""Extract text content from all chunks and write it as a sorted JSON array.

The output is a flat list of strings (one per non-empty chunk), sorted
alphabetically for deterministic checksumming. Documents are passed
through unchanged for downstream skills.
"""Write chunk text as a sorted JSON array with per-chunk change gating.

Config params:
output_path (str): Path to the output JSON file (default:
``data/pipeline_output.json``). Parent
directories are created automatically.
output_path (str): Output JSON path (default: ``data/pipeline_output.json``).
checksum_path (str, optional): JSON file for per-chunk SHA-256 checksums
keyed by ``document_id``.
skip_downstream_if_unchanged (bool, optional): Strip unchanged chunks
so downstream skills skip them (default: true).
"""

def __init__(self, skill_config: dict, global_config: Config) -> None:
super().__init__(skill_config, global_config)
self._output_path = self._config.get("output_path", "data/pipeline_output.json")
self._checksum_path = self._config.get("checksum_path", None)
self._skip_if_unchanged = self._config.get("skip_downstream_if_unchanged", True)

def _compute_checksum(self, content_bytes: bytes) -> str:
return hashlib.sha256(content_bytes).hexdigest()

def _read_stored_checksums(self) -> dict:
"""Return stored {document_id: checksum} map, or empty dict."""
if self._checksum_path and os.path.isfile(self._checksum_path):
try:
with open(self._checksum_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return data
# Legacy format — cannot migrate, start fresh.
self.logger.warning(
"Checksum file contains legacy format — starting fresh."
)
except Exception as e:
self.logger.warning(f"Failed to read stored checksums: {e}")
return {}

def _write_checksums(self, checksums: dict) -> None:
"""Save per-document checksums to disk."""
if self._checksum_path:
os.makedirs(os.path.dirname(self._checksum_path) or ".", exist_ok=True)
with open(self._checksum_path, "w", encoding="utf-8") as f:
json.dump(checksums, f, indent=2, ensure_ascii=False)

def _compute_chunk_checksum(self, chunk) -> str:
"""SHA-256 checksum of a single chunk's content."""
payload = (chunk.content or "").encode("utf-8")
return self._compute_checksum(payload)

def run(self, input: Optional[List[Document]] = None) -> List[Document]:
if not input:
self.logger.warning("JSONWriterSkill received no input — nothing to write.")
return input or []

# Collect only the content from every chunk across all documents
# Collect chunk content across all documents
contents = []
for doc in input:
for chunk in doc.chunks:
if chunk.content:
contents.append(chunk.content)

# Sort for deterministic output (stable checksums)
contents.sort()
contents.sort() # deterministic order for stable checksums

os.makedirs(os.path.dirname(self._output_path) or ".", exist_ok=True)

with open(self._output_path, "w", encoding="utf-8") as f:
json.dump(contents, f, indent=2, ensure_ascii=False)
json_bytes = json.dumps(contents, indent=2, ensure_ascii=False).encode("utf-8")

with open(self._output_path, "wb") as f:
f.write(json_bytes)

self.logger.info(
"Wrote %d chunk content entries to %s",
len(contents),
self._output_path,
)

# Pass-through: downstream skills can still consume the documents
# ── Per-chunk checksum-based change gate ────────────────
# Each chunk is keyed by its document_id (e.g. question hash).
# Only chunks whose content has changed (or are new) are kept;
# unchanged chunks are removed so downstream skills skip them.
if self._checksum_path:
old_checksums = self._read_stored_checksums()
new_checksums: dict = {}

changed_count = 0
unchanged_count = 0

for doc in input:
unchanged_chunks = set()

for chunk in doc.chunks:
doc_id = chunk.document_id or chunk.chunk_id or "unknown"
chunk_checksum = self._compute_chunk_checksum(chunk)
new_checksums[doc_id] = chunk_checksum

old_checksum = old_checksums.get(doc_id)

if old_checksum and chunk_checksum == old_checksum and self._skip_if_unchanged:
unchanged_chunks.add(chunk)
unchanged_count += 1
self.logger.debug(
"Chunk %s unchanged — will be stripped.",
doc_id[:12],
)
else:
changed_count += 1
if old_checksum:
self.logger.debug(
"Chunk %s changed (old: %s, new: %s).",
doc_id[:12],
old_checksum[:12],
chunk_checksum[:12],
)
else:
self.logger.debug("Chunk %s is new.", doc_id[:12])

# Remove unchanged chunks from this document
if unchanged_chunks:
Comment on lines +134 to +135

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for the if statement. You can say doc.chunks -= unchanged_chunks. If the unchanged_chunks is empty, it will be a no-op

doc.chunks -= unchanged_chunks

self.logger.info(
"Change detection: %d changed/new, %d unchanged out of %d chunks.",
changed_count,
unchanged_count,
changed_count + unchanged_count,
)

self._write_checksums(new_checksums)

return input
Loading