Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 92 additions & 13 deletions docling_haystack/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,26 @@
#

"""Docling Haystack converter module."""

import logging
import mimetypes
from abc import ABC, abstractmethod
from enum import Enum
from io import BytesIO
from pathlib import Path
from typing import Any, Iterable, Optional, Union
from typing import Any, Dict, List, Optional, Union

from docling.chunking import BaseChunk, BaseChunker, HybridChunker
from docling.datamodel.base_models import DocumentStream
from docling.datamodel.document import DoclingDocument
from docling.document_converter import DocumentConverter
from haystack import Document, component
from haystack.components.converters.utils import (
get_bytestream_from_source,
normalize_metadata,
)
from haystack.dataclasses import ByteStream

logger = logging.getLogger(__name__)


class ExportType(str, Enum):
Expand Down Expand Up @@ -100,40 +110,109 @@ def __init__(
)
self._meta_extractor = meta_extractor or MetaExtractor()

def _extract_filename_with_extension(
self, source: Union[str, Path, ByteStream], bytestream: ByteStream
) -> str:
"""Extract filename with appropriate extension from source or bytestream.

Args:
source: The source object (str, Path, or ByteStream)
bytestream: The bytestream created from the source

Returns:
str: The extracted filename with appropriate extension
"""
# Default filename
filename = "unknown.pdf"

# Extract filename from source
if isinstance(source, str) or isinstance(source, Path):
filename = Path(source).name
elif isinstance(source, ByteStream) and hasattr(source, "meta"):
# Try to get filename from ByteStream metadata
if "filename" in source.meta:
filename = source.meta["filename"]
elif "name" in source.meta:
filename = source.meta["name"]
# Try to infer extension from mime_type if available
if "mime_type" in source.meta and not Path(filename).suffix:
extension = mimetypes.guess_extension(source.meta["mime_type"])
if extension:
filename = f"{Path(filename).stem}{extension}"

# Check if bytestream metadata contains filename
if hasattr(bytestream, "meta"):
if "filename" in bytestream.meta:
filename = bytestream.meta["filename"]
elif "name" in bytestream.meta:
filename = bytestream.meta["name"]
# Try to infer extension from mime_type if available
if "mime_type" in bytestream.meta and not Path(filename).suffix:
extension = mimetypes.guess_extension(bytestream.meta["mime_type"])
if extension:
filename = f"{Path(filename).stem}{extension}"

return filename

@component.output_types(documents=list[Document])
def run(
self,
paths: Iterable[Union[Path, str]],
sources: List[Union[str, Path, ByteStream]],
meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
):
"""Run the DoclingConverter.

Args:
paths: The input document locations, either as local paths or URLs.
sources: List of file paths or byte streams to convert.
Paths can be files or directories. ByteStream is also supported.
meta: Optional metadata to attach to the Documents.
This value can be a single dictionary or a list of dictionaries,
matching the number of sources.

Returns:
list[Document]: The output Haystack Documents.
"""
documents: list[Document] = []
for filepath in paths:
meta_list = normalize_metadata(meta, len(sources))

for source, metadata in zip(sources, meta_list):
try:
bytestream = get_bytestream_from_source(source=source)
except Exception as e:
logger.warning(f"Could not read {source}. Skipping it. Error: {str(e)}")
continue

# Extract filename with appropriate extension
filename = self._extract_filename_with_extension(source, bytestream)

source_docling = DocumentStream(
name=filename, stream=BytesIO(bytestream.data)
)

hs_docs = []
dl_doc = self._converter.convert(
source=filepath,
source=source_docling,
**self._convert_kwargs,
).document

if self._export_type == ExportType.DOC_CHUNKS:
chunk_iter = self._chunker.chunk(dl_doc=dl_doc)
hs_docs = [
Document(
content=self._chunker.serialize(chunk=chunk),
meta=self._meta_extractor.extract_chunk_meta(chunk=chunk),
for chunk in chunk_iter:
docling_meta = self._meta_extractor.extract_chunk_meta(chunk=chunk)
merged_metadata = {**bytestream.meta, **docling_meta, **metadata}
hs_docs.append(
Document(
content=self._chunker.serialize(chunk=chunk),
meta=merged_metadata,
)
)
for chunk in chunk_iter
]
documents.extend(hs_docs)
elif self._export_type == ExportType.MARKDOWN:
docling_meta = self._meta_extractor.extract_dl_doc_meta(dl_doc=dl_doc)
merged_metadata = {**bytestream.meta, **docling_meta, **metadata}
hs_doc = Document(
content=dl_doc.export_to_markdown(**self._md_export_kwargs),
meta=self._meta_extractor.extract_dl_doc_meta(dl_doc=dl_doc),
meta=merged_metadata,
)
documents.append(hs_doc)
else:
Expand Down
Loading