Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update aryn-sdk's async DocParse interface to raise Exceptions rather than returning error strings #1164

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/aryn-sdk/aryn_sdk/partition/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
table_elem_to_dataframe,
convert_image_element,
PartitionError,
PartitionTaskError,
PartitionTaskNotFoundError,
)
from .art import draw_with_boxes

Expand All @@ -18,6 +20,8 @@
"draw_with_boxes",
"convert_image_element",
"PartitionError",
"PartitionTaskError",
"PartitionTaskNotFoundError",
"partition_file_async_submit",
"partition_file_async_result",
"partition_file_async_cancel",
Expand Down
65 changes: 43 additions & 22 deletions lib/aryn-sdk/aryn_sdk/partition/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ def __init__(self, message: str, status_code: int) -> None:
self.status_code = status_code


class PartitionTaskError(Exception):
def __init__(self, message: str, status_code: int) -> None:
super().__init__(message)
self.status_code = status_code


class PartitionTaskNotFoundError(Exception):
def __init__(self, message: str, status_code: int) -> None:
super().__init__(message)
self.status_code = status_code


def partition_file(
file: Union[BinaryIO, str, PathLike],
*,
Expand Down Expand Up @@ -212,10 +224,7 @@ def _partition_file_inner(
headers = _generate_headers(aryn_config.api_key(), webhook_url)
resp = requests.post(docparse_url, files=files, headers=headers, stream=_should_stream(), verify=ssl_verify)

if resp.status_code < 200 or resp.status_code > 299:
raise requests.exceptions.HTTPError(
f"Error: status_code: {resp.status_code}, reason: {resp.text}", response=resp
)
raise_error_on_non_2xx(resp)

content = []
partial_line = []
Expand Down Expand Up @@ -261,6 +270,13 @@ def _partition_file_inner(
return data


def raise_error_on_non_2xx(resp: requests.Response) -> None:
if resp.status_code < 200 or resp.status_code > 299:
raise requests.exceptions.HTTPError(
f"Error: status_code: {resp.status_code}, reason: {resp.text}", response=resp
)


def _process_config(aryn_api_key: Optional[str] = None, aryn_config: Optional[ArynConfig] = None) -> ArynConfig:
if aryn_api_key is not None:
if aryn_config is not None:
Expand Down Expand Up @@ -434,10 +450,12 @@ def partition_file_async_result(

For examples of usage see README.md

Raises a `PartitionTaskNotFoundError` if the not task with the task_id can be found.

Returns:
A dict containing "status" and "status_code". When "status" is "done", the returned dict also contains "result"
which contains what would have been returned had `partition_file` been called directly. "status" can be "done",
"pending", "error", or "no_such_task".
which contains what would have been returned had `partition_file` been called directly. "status" can be "done"
or "pending".

Unlike `partition_file`, this function does not raise an Exception if the partitioning failed.
"""
Expand All @@ -451,15 +469,15 @@ def partition_file_async_result(
response = requests.get(
specific_task_url, params=g_parameters, headers=headers, stream=_should_stream(), verify=ssl_verify
)

if response.status_code == 200:
return {"status": "done", "status_code": response.status_code, "result": response.json()}
elif response.status_code == 202:
return {"status": "pending", "status_code": response.status_code}
elif response.status_code == 404:
return {"status": "no_such_task", "status_code": response.status_code}
raise PartitionTaskNotFoundError("No such task", response.status_code)
else:
return {"status": "error", "status_code": response.status_code}
raise_error_on_non_2xx(response)
raise PartitionTaskError("Unexpected response code", response.status_code)


def partition_file_async_cancel(
Expand All @@ -469,17 +487,14 @@ def partition_file_async_cancel(
aryn_config: Optional[ArynConfig] = None,
ssl_verify: bool = True,
async_cancel_url: Optional[str] = None,
) -> bool:
) -> None:
"""
Cancel an asynchronous partitioning task by task_id. Meant to be used with `partition_file_async_submit`.

Returns:
A bool indicating whether the task was successfully cancelled by this request.

A task can only be successfully cancelled once. A return value of false can mean the task was already cancelled,
the task is already done, or there was no task with the given task_id.
Raises an exception if there is no cancellable task found with the given task_id. A task can only be successfully
cancelled once.

For an example of usage see README.md
For an example of usage see README.md
"""
if not async_cancel_url:
async_cancel_url = _convert_sync_to_async_url(ARYN_DOCPARSE_URL, "/cancel", truncate=True)
Expand All @@ -492,11 +507,12 @@ def partition_file_async_cancel(
specific_task_url, params=g_parameters, headers=headers, stream=_should_stream(), verify=ssl_verify
)
if response.status_code == 200:
return True
return
elif response.status_code == 404:
return False
raise PartitionTaskNotFoundError("No such task", response.status_code)
else:
raise Exception("Unexpected response code.")
raise_error_on_non_2xx(response)
raise PartitionTaskError("Unexpected response code.", response.status_code)


def partition_file_async_list(
Expand Down Expand Up @@ -530,11 +546,16 @@ def partition_file_async_list(
response = requests.get(
async_list_url, params=g_parameters, headers=headers, stream=_should_stream(), verify=ssl_verify
)
raise_error_on_non_2xx(response)
if response.status_code != 200:
raise PartitionTaskError("Unexpected response code", response.status_code)

result = response.json()

result = response.json()["tasks"]
for v in result.values():
tasks = result["tasks"]
for v in tasks.values():
del v["path"]
return result
return tasks


# Heavily adapted from lib/sycamore/data/table.py::Table.to_csv()
Expand Down
25 changes: 14 additions & 11 deletions lib/aryn-sdk/aryn_sdk/test/test_partition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from os import PathLike
from typing import BinaryIO, Union
from aryn_sdk.partition.partition import convert_image_element, tables_to_pandas, ARYN_DOCPARSE_URL
from aryn_sdk.partition.partition import ARYN_DOCPARSE_URL
import pytest
import json
import time
Expand All @@ -14,6 +14,9 @@
partition_file_async_cancel,
partition_file_async_list,
PartitionError,
PartitionTaskNotFoundError,
convert_image_element,
tables_to_pandas,
)
from requests.exceptions import HTTPError

Expand Down Expand Up @@ -182,8 +185,8 @@ def test_convert_img():


def test_invalid_task_id():
response = partition_file_async_result("INVALID_JOB_ID")
assert response["status"] == "no_such_task"
with pytest.raises(PartitionTaskNotFoundError):
partition_file_async_result("INVALID_JOB_ID")


def test_partition_file_async_submit(mocker):
Expand Down Expand Up @@ -292,16 +295,16 @@ def test_partition_file_async_cancel():

before_cancel_result = partition_file_async_result(task_id)
assert before_cancel_result["status"] == "pending"
assert partition_file_async_cancel(task_id)
partition_file_async_cancel(task_id)

# Cancellation is not reflected in the result immediately
for _ in range(10):
time.sleep(0.1)
after_cancel_result = partition_file_async_result(task_id)
if after_cancel_result["status"] != "pending":
break
assert after_cancel_result["status"] == "pending"
assert after_cancel_result["status"] == "no_such_task"
with pytest.raises(PartitionTaskNotFoundError):
for _ in range(10):
time.sleep(0.1)
after_cancel_result = partition_file_async_result(task_id)
if after_cancel_result["status"] != "pending":
break
assert after_cancel_result["status"] == "pending"


def test_smoke_webhook(mocker):
Expand Down
Loading