Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Opensearch Writer Reliability #1130

Merged
merged 7 commits into from
Feb 6, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge with main
dhruvkaliraman7 committed Feb 5, 2025
commit 2e35fdfe92761b3635822feca3b36478d40ccb31
Original file line number Diff line number Diff line change
@@ -180,6 +180,74 @@ def test_ingest_and_read(self, setup_index, os_client, exec_mode):

os_client.indices.delete(setup_index, ignore_unavailable=True)

def test_doc_reconstruct(self, setup_index, os_client):
with tempfile.TemporaryDirectory() as materialized_dir:
self._test_doc_reconstruct(setup_index, os_client, materialized_dir)

def _test_doc_reconstruct(self, setup_index, os_client, materialized_dir):
"""
Validates data is readable from OpenSearch, and that we can rebuild processed Sycamore documents.
"""

print(f"Using materialized dir: {materialized_dir}")

def doc_reconstructor(doc_id: str) -> Document:
import pickle

data = pickle.load(open(f"{materialized_dir}/{setup_index}-{doc_id}", "rb"))
return Document(**data)

def doc_to_name(doc: Document, bin: bytes) -> str:
return f"{setup_index}-{doc.doc_id}"

path = str(TEST_DIR / "resources/data/pdfs/Ray.pdf")
context = sycamore.init(exec_mode=ExecMode.RAY)
llm = OpenAI(OpenAIModels.GPT_4O_MINI)
extractor = OpenAIEntityExtractor("title", llm=llm)
original_docs = (
context.read.binary(path, binary_format="pdf")
.partition(ArynPartitioner(aryn_api_key=ARYN_API_KEY))
.extract_entity(extractor)
.materialize(path={"root": materialized_dir, "name": doc_to_name})
.explode()
.write.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=setup_index,
index_settings=TestOpenSearchRead.INDEX_SETTINGS,
execute=False,
)
.take_all()
)

os_client.indices.refresh(setup_index)

expected_count = len(original_docs)
actual_count = get_doc_count(os_client, setup_index)
# refresh should have made all ingested docs immediately available for search
assert actual_count == expected_count, f"Expected {expected_count} documents, found {actual_count}"

retrieved_docs_reconstructed = context.read.opensearch(
os_client_args=TestOpenSearchRead.OS_CLIENT_ARGS,
index_name=setup_index,
reconstruct_document=True,
).take_all()

assert len(retrieved_docs_reconstructed) == 1
retrieved_sorted = sorted(retrieved_docs_reconstructed, key=lambda d: d.doc_id)

def remove_reconstruct_doc_property(doc: Document):
for element in doc.data["elements"]:
element["properties"].pop(DocumentPropertyTypes.SOURCE)

for doc in retrieved_sorted:
remove_reconstruct_doc_property(doc)

from_materialized = [doc_reconstructor(doc.doc_id) for doc in retrieved_sorted]
compare_connector_docs(from_materialized, retrieved_sorted)

# Clean up
os_client.indices.delete(setup_index, ignore_unavailable=True)

def test_write_with_reliability(self, setup_index, os_client, exec_mode):
"""
Validates data is readable from OpenSearch, and that we can rebuild processed Sycamore documents.
@@ -191,7 +259,7 @@ def test_write_with_reliability(self, setup_index, os_client, exec_mode):
# 2 docs for ray execution
(
context.read.binary([path, path], binary_format="pdf")
.partition(partitioner=UnstructuredPdfPartitioner())
.partition(ArynPartitioner(aryn_api_key=ARYN_API_KEY))
.explode()
.materialize(path=tmpdir1)
.execute()
47 changes: 47 additions & 0 deletions lib/sycamore/sycamore/writer.py
Original file line number Diff line number Diff line change
@@ -810,6 +810,53 @@ def json(

self._maybe_execute(node, True)

def aryn(
self,
docset_id: Optional[str] = None,
name: Optional[str] = None,
aryn_api_key: Optional[str] = None,
aryn_url: Optional[str] = None,
**kwargs,
) -> Optional["DocSet"]:
"""
Writes all documents of a DocSet to Aryn.

Args:
docset_id: The id of the docset to write to. If not provided, a new docset will be created.
create_new_docset: If true, a new docset will be created. If false, the docset with the provided
id will be used.
name: The name of the new docset to create. Required if create_new_docset is true.
aryn_api_key: The api key to use for authentication. If not provided, the api key from the config
file will be used.
aryn_url: The url of the Aryn instance to write to. If not provided, the url from the config file
will be used.
"""

from sycamore.connectors.aryn.ArynWriter import (
ArynWriter,
ArynWriterClientParams,
ArynWriterTargetParams,
)

if aryn_api_key is None:
aryn_api_key = ArynConfig.get_aryn_api_key()
if aryn_url is None:
aryn_url = ArynConfig.get_aryn_url()

if docset_id is None and name is None:
raise ValueError("Either docset_id or name must be provided")

if docset_id is None and name is not None:
headers = {"Authorization": f"Bearer {aryn_api_key}"}
res = requests.post(url=f"{aryn_url}/docsets", data={"name": name}, headers=headers)
docset_id = res.json()["docset_id"]

client_params = ArynWriterClientParams(aryn_url, aryn_api_key)
target_params = ArynWriterTargetParams(docset_id)
ds = ArynWriter(self.plan, client_params=client_params, target_params=target_params, **kwargs)

return self._maybe_execute(ds, True)

def _maybe_execute(
self, node: Node, execute: bool, client: Optional[OpenSearchWriterClient] = None
) -> Optional[DocSet]:
You are viewing a condensed version of this merge commit. You can view the full changes here.