@@ -638,18 +638,18 @@ def _handle_staging_operation(
638
638
639
639
# Handle __input_stream__ token for PUT operations
640
640
if (
641
- row .operation == "PUT" and
642
- getattr (row , "localFile" , None ) == "__input_stream__"
641
+ row .operation == "PUT"
642
+ and getattr (row , "localFile" , None ) == "__input_stream__"
643
643
):
644
644
if not self ._input_stream_data :
645
645
raise ProgrammingError (
646
646
"No input stream provided for streaming operation" ,
647
- session_id_hex = self .connection .get_session_id_hex ()
647
+ session_id_hex = self .connection .get_session_id_hex (),
648
648
)
649
649
return self ._handle_staging_put_stream (
650
650
presigned_url = row .presignedUrl ,
651
651
stream = self ._input_stream_data ,
652
- headers = headers
652
+ headers = headers ,
653
653
)
654
654
655
655
# For non-streaming operations, validate staging_allowed_local_path
@@ -723,50 +723,50 @@ def _handle_staging_put_stream(
723
723
headers : Optional [dict ] = None ,
724
724
) -> None :
725
725
"""Handle PUT operation with streaming data.
726
-
726
+
727
727
Args:
728
728
presigned_url: The presigned URL for upload
729
729
stream: Binary stream to upload
730
730
headers: Optional HTTP headers
731
-
731
+
732
732
Raises:
733
733
OperationalError: If the upload fails
734
734
"""
735
-
735
+
736
736
# Prepare headers
737
737
http_headers = dict (headers ) if headers else {}
738
-
738
+
739
739
try :
740
740
# Stream directly to presigned URL
741
741
response = requests .put (
742
742
url = presigned_url ,
743
743
data = stream ,
744
744
headers = http_headers ,
745
- timeout = 300 # 5 minute timeout
745
+ timeout = 300 , # 5 minute timeout
746
746
)
747
-
747
+
748
748
# Check response codes
749
- OK = requests .codes .ok # 200
750
- CREATED = requests .codes .created # 201
751
- ACCEPTED = requests .codes .accepted # 202
752
- NO_CONTENT = requests .codes .no_content # 204
753
-
749
+ OK = requests .codes .ok # 200
750
+ CREATED = requests .codes .created # 201
751
+ ACCEPTED = requests .codes .accepted # 202
752
+ NO_CONTENT = requests .codes .no_content # 204
753
+
754
754
if response .status_code not in [OK , CREATED , NO_CONTENT , ACCEPTED ]:
755
755
raise OperationalError (
756
756
f"Staging operation over HTTP was unsuccessful: { response .status_code } -{ response .text } " ,
757
- session_id_hex = self .connection .get_session_id_hex ()
757
+ session_id_hex = self .connection .get_session_id_hex (),
758
758
)
759
-
759
+
760
760
if response .status_code == ACCEPTED :
761
761
logger .debug (
762
762
f"Response code { ACCEPTED } from server indicates upload was accepted "
763
763
"but not yet applied on the server. It's possible this command may fail later."
764
764
)
765
-
765
+
766
766
except requests .exceptions .RequestException as e :
767
767
raise OperationalError (
768
768
f"HTTP request failed during stream upload: { str (e )} " ,
769
- session_id_hex = self .connection .get_session_id_hex ()
769
+ session_id_hex = self .connection .get_session_id_hex (),
770
770
) from e
771
771
772
772
@log_latency (StatementType .SQL )
@@ -899,7 +899,7 @@ def execute(
899
899
self ._input_stream_data = None
900
900
if input_stream is not None :
901
901
# Validate stream has required methods
902
- if not hasattr (input_stream , ' read' ):
902
+ if not hasattr (input_stream , " read" ):
903
903
raise TypeError (
904
904
"input_stream must be a binary stream with read() method"
905
905
)
@@ -916,7 +916,9 @@ def execute(
916
916
)
917
917
elif param_approach == ParameterApproach .NATIVE :
918
918
normalized_parameters = self ._normalize_tparametercollection (parameters )
919
- param_structure = self ._determine_parameter_structure (normalized_parameters )
919
+ param_structure = self ._determine_parameter_structure (
920
+ normalized_parameters
921
+ )
920
922
transformed_operation = transform_paramstyle (
921
923
operation , normalized_parameters , param_structure
922
924
)
0 commit comments