@@ -65,28 +65,30 @@ def __init__(
65
65
file_path : str ,
66
66
staging_credentials : Optional [CredentialsConfiguration ] = None ,
67
67
staging_iam_role : str = None ,
68
+ s3_region : str = "us-east-1" , # Add region as a parameter
68
69
) -> None :
69
70
super ().__init__ (file_path , staging_credentials )
70
71
self ._staging_iam_role = staging_iam_role
72
+ self ._s3_region = s3_region # Store region
71
73
self ._job_client : "RedshiftClient" = None
72
74
73
75
def run (self ) -> None :
74
76
self ._sql_client = self ._job_client .sql_client
75
- # we assume s3 credentials where provided for the staging
77
+ # Assume S3 credentials were provided for the staging
76
78
credentials = ""
77
79
if self ._staging_iam_role :
78
- credentials = f"IAM_ROLE '{ self ._staging_iam_role } '"
80
+ credentials = f"IAM_ROLE '{ self ._staging_iam_role } ' REGION ' { self . _s3_region } ' "
79
81
elif self ._staging_credentials and isinstance (
80
82
self ._staging_credentials , AwsCredentialsWithoutDefaults
81
83
):
82
84
aws_access_key = self ._staging_credentials .aws_access_key_id
83
85
aws_secret_key = self ._staging_credentials .aws_secret_access_key
84
86
credentials = (
85
87
"CREDENTIALS"
86
- f" 'aws_access_key_id={ aws_access_key } ;aws_secret_access_key={ aws_secret_key } '"
88
+ f" 'aws_access_key_id={ aws_access_key } ;aws_secret_access_key={ aws_secret_key } ;region= { self . _s3_region } '"
87
89
)
88
90
89
- # get format
91
+ # Get format
90
92
ext = os .path .splitext (self ._bucket_path )[1 ][1 :]
91
93
file_type = ""
92
94
dateformat = ""
@@ -97,15 +99,12 @@ def run(self) -> None:
97
99
compression = "" if is_compression_disabled () else "GZIP"
98
100
elif ext == "parquet" :
99
101
file_type = "PARQUET"
100
- # if table contains json types then SUPER field will be used.
101
- # https://docs.aws.amazon.com/redshift/latest/dg/ingest-super.html
102
102
if table_schema_has_type (self ._load_table , "json" ):
103
103
file_type += " SERIALIZETOJSON"
104
104
else :
105
105
raise ValueError (f"Unsupported file type { ext } for Redshift." )
106
106
107
107
with self ._sql_client .begin_transaction ():
108
- # TODO: if we ever support csv here remember to add column names to COPY
109
108
self ._sql_client .execute_sql (f"""
110
109
COPY { self ._sql_client .make_qualified_table_name (self .load_table_name )}
111
110
FROM '{ self ._bucket_path } '
0 commit comments