1
1
"""Upload a batch of inferences to the Openlayer platform."""
2
2
3
3
import time
4
+ import logging
4
5
import tempfile
5
6
from typing import Optional
6
7
12
13
from ... import Openlayer
13
14
from ..._utils import maybe_transform
14
15
from ...types .inference_pipelines import data_stream_params
15
- import asyncio
16
16
17
+ log : logging .Logger = logging .getLogger (__name__ )
17
18
18
- async def upload_batch_inferences_async (
19
+
20
+ def upload_batch_inferences (
19
21
client : Openlayer ,
20
22
inference_pipeline_id : str ,
21
23
config : data_stream_params .Config ,
22
24
dataset_df : Optional [pd .DataFrame ] = None ,
23
25
dataset_path : Optional [str ] = None ,
24
26
storage_type : Optional [StorageType ] = None ,
25
27
merge : bool = False ,
26
- verbose : bool = False ,
27
28
) -> None :
28
29
"""Uploads a batch of inferences to the Openlayer platform."""
29
30
if dataset_df is None and dataset_path is None :
@@ -45,16 +46,9 @@ async def upload_batch_inferences_async(
45
46
# writer
46
47
if dataset_df is not None :
47
48
temp_file_path = f"{ tmp_dir } /dataset.arrow"
48
- if verbose :
49
- print ("Converting DataFrame to pyarrow Table..." )
50
49
pa_table = pa .Table .from_pandas (dataset_df )
51
50
pa_schema = pa_table .schema
52
51
53
- if verbose :
54
- print (
55
- "Writing Arrow Table using RecordBatchStreamWriter to "
56
- f"{ temp_file_path } "
57
- )
58
52
with pa .ipc .RecordBatchStreamWriter (temp_file_path , pa_schema ) as writer :
59
53
writer .write_table (pa_table , max_chunksize = 16384 )
60
54
else :
@@ -64,14 +58,15 @@ async def upload_batch_inferences_async(
64
58
# camelCase the config
65
59
config = maybe_transform (config , data_stream_params .Config )
66
60
67
- # Upload tarball to storage
68
- if verbose :
69
- print ("Uploading dataset to storage via presigned URL..." )
70
- uploader .upload (
61
+ # Upload file to Openlayer storage
62
+ log .info ("Uploading file to Openlayer" )
63
+ response = uploader .upload (
71
64
file_path = temp_file_path ,
72
65
object_name = object_name ,
73
66
presigned_url_response = presigned_url_response ,
74
67
)
68
+ if response .status_code != 200 :
69
+ raise ValueError (f"Failed to upload file to storage: { response .text } " )
75
70
76
71
# Notify the backend
77
72
client .post (
@@ -83,30 +78,7 @@ async def upload_batch_inferences_async(
83
78
"config" : config ,
84
79
},
85
80
)
86
-
87
-
88
- def upload_batch_inferences (
89
- client : Openlayer ,
90
- inference_pipeline_id : str ,
91
- config : data_stream_params .Config ,
92
- dataset_df : Optional [pd .DataFrame ] = None ,
93
- dataset_path : Optional [str ] = None ,
94
- storage_type : Optional [StorageType ] = None ,
95
- merge : bool = False ,
96
- verbose : bool = False ,
97
- ) -> None :
98
- asyncio .run (
99
- upload_batch_inferences_async (
100
- client ,
101
- inference_pipeline_id ,
102
- config ,
103
- dataset_df ,
104
- dataset_path ,
105
- storage_type ,
106
- merge ,
107
- verbose ,
108
- )
109
- )
81
+ log .info ("Success! Uploaded batch inferences" )
110
82
111
83
112
84
def update_batch_inferences (
0 commit comments