-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathexternal_object_replicator.py
More file actions
206 lines (186 loc) · 7.78 KB
/
external_object_replicator.py
File metadata and controls
206 lines (186 loc) · 7.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import dateutil
import yaml
import threading
import logging
import asyncio
from collections import OrderedDict
import common.util
import common.config
from tools.ExternalObjectReplicator.util.glue_util import clone_glue_catalog
from common.aws_service import redshift_execute_query
import tools.ExternalObjectReplicator.util.copy_util as copy_util
from tools.ExternalObjectReplicator.util.copy_util import (
clone_objects_to_s3,
get_s3_folder_size,
check_file_existence,
)
from common.log import init_logging, log_version
logger = logging.getLogger("ExternalObjectReplicatorLogger")
g_disable_progress_bar = None
global_lock = threading.Lock()
g_bar_format = "{desc}: {percentage:3.0f}%|{bar}"
def execute_svl_query(cluster_object, end_time, file_config, redshift_user, start_time):
with open(
"tools/ExternalObjectReplicator/sql/external_table_query.sql", "r"
) as svv_external_table:
external_table_query = svv_external_table.read().format(
start=start_time, end=end_time, db=cluster_object["database"]
)
logger.info("Scanning system tables to find Glue databases and tables")
external_table_response = redshift_execute_query(
cluster_object["id"],
redshift_user,
cluster_object["database"],
file_config["region"],
external_table_query,
)
spectrum_source_location = []
spectrum_obj_not_found = []
# Query Spectrum files
logger.info("Scanning system tables to find Spectrum files queried by source cluster")
with open("tools/ExternalObjectReplicator/sql/svl_s3_list.sql", "r") as svl_s3_list:
SVL_S3LIST_query = svl_s3_list.read().format(
start=start_time, end=end_time, db=cluster_object["database"]
)
SVL_S3LIST_result = redshift_execute_query(
cluster_object["id"],
redshift_user,
cluster_object["database"],
file_config["region"],
SVL_S3LIST_query,
)
total_SVL_S3_List_scan = []
for record in SVL_S3LIST_result["Records"]:
total_SVL_S3_List_scan.append(
[{"stringValue": f"{record[0]['stringValue']}{'/'}{record[1]['stringValue']}"}]
)
if SVL_S3LIST_result["TotalNumRows"] > 0:
logger.info(
f"{len(SVL_S3LIST_result['Records'])} files detected across Spectrum queries from the SVL_S3LIST system "
f"table between {start_time} and {end_time}"
)
spectrum_source_location, spectrum_obj_not_found = asyncio.run(
async_check_file_existence(SVL_S3LIST_result, "spectrumfiles")
)
logger.info(
f"Number of Spectrum files that can be replicated: "
f"{(len(spectrum_source_location))} files"
)
logger.info(
f"Total size of Spectrum files that can be replicated: "
f"{get_s3_folder_size(copy_file_list=spectrum_source_location)}"
)
else:
logger.info("No Spectrum files found.")
return (
SVL_S3LIST_result,
spectrum_source_location,
external_table_response,
spectrum_obj_not_found,
)
async def async_check_file_existence(query_response, obj_type):
files_found, files_not_found = await check_file_existence(query_response, obj_type)
return files_found, files_not_found
def execute_stl_load_query(cluster_object, end_time, file_config, redshift_user, start_time):
# Query COPY objects
copy_objects_not_found = []
copy_source_location = []
with open("tools/ExternalObjectReplicator/sql/stl_load_query.sql", "r") as stl_load:
STL_LOAD_query = stl_load.read().format(
start=start_time, end=end_time, db=cluster_object["database"]
)
STL_LOAD_response = redshift_execute_query(
cluster_object["id"],
redshift_user,
cluster_object["database"],
file_config["region"],
STL_LOAD_query,
)
logger.info("Scanning system tables to find COPY files")
logger.debug(f"Executing SQL Query to find COPY files")
if STL_LOAD_response["TotalNumRows"] > 0:
logger.info(
f"{len(STL_LOAD_response['Records'])} files detected across COPY queries from the STL_LOAD_COMMIT system "
f"table between {start_time} and {end_time}"
)
copy_source_location, copy_objects_not_found = asyncio.run(
async_check_file_existence(STL_LOAD_response, "copyfiles")
)
logger.info(
f"Percentage of COPY files that can be replicated: "
f"{((len(copy_source_location)) / len(STL_LOAD_response['Records'])) * 100}%"
)
logger.info(
f"Total size of COPY files that can be replicated: "
f"{get_s3_folder_size(copy_file_list=copy_source_location)}"
)
else:
logger.info("No COPY files found.")
return STL_LOAD_response, copy_objects_not_found, copy_source_location
def main():
# Parse config file
file_config = common.config.get_config_file_from_args()
# Setup Logging
level = logging.getLevelName(file_config.get("log_level", "INFO").upper())
init_logging(
"external_replicator.log",
dir="tools/ExternalObjectReplicator/logs",
level=level,
preamble=yaml.dump(file_config),
backup_count=file_config.get("backup_count", 2),
script_type="external object replicator",
logger_name="ExternalObjectReplicatorLogger",
)
log_version()
cluster_object = common.util.cluster_dict(endpoint=file_config["source_cluster_endpoint"])
start_time = dateutil.parser.parse(file_config["start_time"]).astimezone(dateutil.tz.tzutc())
end_time = dateutil.parser.parse(file_config["end_time"]).astimezone(dateutil.tz.tzutc())
redshift_user = file_config["redshift_user"]
(
STL_LOAD_response,
copy_objects_not_found,
copy_source_location,
) = execute_stl_load_query(cluster_object, end_time, file_config, redshift_user, start_time)
(
SVL_S3LIST_result,
spectrum_source_location,
external_table_response,
spectrum_obj_not_found,
) = execute_svl_query(cluster_object, end_time, file_config, redshift_user, start_time)
options = ["1. Yes - Proceed with cloning", "2. No - Exit"]
print("Would you like to proceed with cloning?")
print(options[0])
print(options[1])
choice = input("Enter your choice: ")
if int(choice) == 1:
logger.info("Cloning the copy objects")
if STL_LOAD_response["TotalNumRows"] > 0:
logger.info(f"== Begin to clone COPY files to {file_config['target_s3_location']} ==")
copy_util.clone_objects_to_s3(
file_config["target_s3_location"],
obj_type="copyfiles",
source_location=copy_source_location,
objects_not_found=copy_objects_not_found,
)
if SVL_S3LIST_result["TotalNumRows"] > 0:
logger.info(
f"== Begin to clone Spectrum files to {file_config['target_s3_location']} =="
)
logger.info("== Begin to clone Glue databases and tables ==")
clone_glue_catalog(
external_table_response["Records"],
file_config["target_s3_location"],
file_config["region"],
)
copy_util.clone_objects_to_s3(
file_config["target_s3_location"],
objects_not_found=spectrum_obj_not_found,
source_location=spectrum_source_location,
obj_type="spectrumfiles",
)
elif SVL_S3LIST_result["TotalNumRows"] == 0 and STL_LOAD_response["TotalNumRows"] == 0:
logger.info("No object found to be replicated")
else:
logger.info("Customer decided not to proceed with cloning")
if __name__ == "__main__":
main()