Skip to content

Commit b2e4183

Browse files
authored
YQ-4144 added manual test on s3 olap import / export (#17969)
1 parent b7d9010 commit b2e4183

File tree

3 files changed

+327
-0
lines changed

3 files changed

+327
-0
lines changed
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
import boto3
2+
import datetime
3+
import logging
4+
import os
5+
import traceback
6+
import time
7+
import ydb
8+
from typing import Optional
9+
10+
from ydb.tests.olap.lib.results_processor import ResultsProcessor
11+
from ydb.tests.olap.lib.utils import get_external_param
12+
from ydb.tests.olap.lib.ydb_cluster import YdbCluster
13+
14+
logger = logging.getLogger("TestLargeS3Import")
15+
16+
17+
class TestLargeS3Import:
18+
class Results:
19+
def __init__(self, suite: str):
20+
self.suite = suite
21+
self.stats = dict()
22+
self.start_time: Optional[float] = None
23+
self.test_name: Optional[str] = None
24+
self.stage_starts: dict[str, float] = dict()
25+
self.finished = False
26+
27+
def __report_results(self, is_successful: bool = False):
28+
assert self.start_time is not None, "Results is not setupped"
29+
assert not self.finished, "Results is already reported"
30+
logger.info(f"reporting result stats:\n{self.stats}\nis_successful: {is_successful}")
31+
ResultsProcessor.upload_results(
32+
kind="Import/Export",
33+
suite=self.suite,
34+
test=self.test_name,
35+
timestamp=self.start_time,
36+
is_successful=is_successful,
37+
statistics=self.stats,
38+
duration=(time.time() - self.start_time) / 1000000
39+
)
40+
41+
def setup(self, test_name: str):
42+
self.start_time = time.time()
43+
self.test_name = test_name
44+
self.stats = {
45+
"stage": "setup",
46+
"stage_duration_seconds": dict()
47+
}
48+
self.stage_starts = dict()
49+
self.finished = False
50+
self.__report_results()
51+
52+
def on_stage_start(self, stage: str):
53+
self.stats["stage"] = f"{stage}-RUNNING"
54+
self.stage_starts[stage] = time.time()
55+
self.__report_results()
56+
57+
def on_stage_finish(self, stage: str):
58+
self.stats["stage"] = f"{stage}-FINISHED"
59+
self.stats["stage_duration_seconds"][stage] = time.time() - self.stage_starts[stage]
60+
self.__report_results()
61+
62+
def report_finish(self):
63+
if not self.finished:
64+
self.__report_results(is_successful=True)
65+
self.finished = True
66+
67+
def report_fail(self, error: str):
68+
if not self.finished:
69+
self.stats["error"] = error
70+
self.__report_results()
71+
self.finished = True
72+
73+
class ReportTime:
74+
def __init__(self, results, stage: str):
75+
self.results = results
76+
self.stage = stage
77+
78+
def __enter__(self):
79+
logger.info(f"starting {self.stage}...")
80+
self.results.on_stage_start(self.stage)
81+
82+
def __exit__(self, exc_type, exc_val, exc_tb):
83+
if exc_type is not None:
84+
error = f"exception[{exc_type}]: {exc_val}, traceback:\n{traceback.format_exc()}"
85+
logger.error(f"{self.stage} failed, {error}")
86+
self.results.report_fail(error)
87+
return
88+
89+
logger.info(f"{self.stage} finished")
90+
self.results.on_stage_finish(self.stage)
91+
92+
@classmethod
93+
def setup_class(cls):
94+
cls.sink_access_key_id = os.getenv("ACCESS_KEY_ID", None)
95+
assert cls.sink_access_key_id is not None, "ACCESS_KEY_ID is not set for sink bucket"
96+
97+
cls.sink_access_key_secret = os.getenv("ACCESS_KEY_SECRET", None)
98+
assert cls.sink_access_key_secret is not None, "ACCESS_KEY_SECRET is not set for sink bucket"
99+
100+
cls.scale = get_external_param("scale", "1000")
101+
assert cls.scale in ["1", "10", "100", "1000"], f"Invalid scale: {cls.scale}"
102+
103+
cls.s3_url = "https://storage.yandexcloud.net"
104+
cls.s3_sink_bucket = "olap-exp-private"
105+
cls.external_source_path = f"{YdbCluster.tables_path}/tpc_h_s3_parquet_import"
106+
cls.external_sink_path = f"{YdbCluster.tables_path}/tpc_h_s3_parquet_export"
107+
cls.external_table_path = f"{YdbCluster.tables_path}/s{cls.scale}/tpc_h_lineitem_s3_parquet_import"
108+
cls.external_sink_table_path = f"{YdbCluster.tables_path}/s{cls.scale}/tpc_h_lineitem_s3_parquet_export"
109+
cls.olap_table_path = f"{YdbCluster.tables_path}/s{cls.scale}/tpc_h_lineitem_olap"
110+
cls.table_size = {
111+
"1": 6001215,
112+
"10": 59986052,
113+
"100": 600037902,
114+
"1000": 5999989709,
115+
}[cls.scale]
116+
117+
logger.info(f"test configuration, scale: {cls.scale}, "
118+
f"external source: {cls.external_source_path}, "
119+
f"external table: {cls.external_table_path}, "
120+
f"olap table: {cls.olap_table_path}, "
121+
f"external sink: {cls.external_sink_path}, "
122+
f"external sink table: {cls.external_sink_table_path}")
123+
logger.info(f"target claster info, endpoint: {YdbCluster.ydb_endpoint}, "
124+
f"database: {YdbCluster.ydb_database}, "
125+
f"tables path: {YdbCluster.tables_path}, "
126+
f"has key {'YES' if os.getenv('OLAP_YDB_OAUTH', None) else 'NO'}")
127+
logger.info(f"results info, send-results: {ResultsProcessor.send_results}, "
128+
f"endpoints: {get_external_param('results-endpoint', '-')}, "
129+
f"dbs: {get_external_param('results-db', '-')}, "
130+
f"tables: {get_external_param('results-table', '-')}, "
131+
f"has key {'YES' if os.getenv('RESULT_YDB_OAUTH', None) else 'NO'}")
132+
133+
health_errors, health_warnings = YdbCluster.check_if_ydb_alive()
134+
logger.info(f"ydb health warnings: {health_warnings}")
135+
assert health_errors is None, f"ydb is not alive: {health_errors}"
136+
137+
cls.session_pool = ydb.QuerySessionPool(YdbCluster.get_ydb_driver())
138+
cls.results = cls.Results(suite="TestLargeS3Import")
139+
140+
def query(self, statement, log_query=True, max_retries=10):
141+
if log_query:
142+
logger.info(f"running query:\n{statement}")
143+
144+
return self.session_pool.execute_with_retries(
145+
statement,
146+
settings=ydb.BaseRequestSettings().with_timeout(6000),
147+
retry_settings=ydb.RetrySettings(max_retries=max_retries)
148+
)
149+
150+
def cleanup_tables(self):
151+
logger.info(f"cleaning up table `{self.olap_table_path}`...")
152+
self.query(f"DROP TABLE IF EXISTS `{self.olap_table_path}`;")
153+
154+
def setup_datasource(self):
155+
logger.info(f"setupping datasource by path `{YdbCluster.tables_path}/`...")
156+
self.query(f"""
157+
CREATE OR REPLACE EXTERNAL DATA SOURCE `{self.external_source_path}` WITH (
158+
SOURCE_TYPE="ObjectStorage",
159+
LOCATION="{self.s3_url}/tpc/",
160+
AUTH_METHOD="NONE"
161+
);
162+
163+
CREATE OR REPLACE EXTERNAL TABLE `{self.external_table_path}` (
164+
l_orderkey Int64 NOT NULL,
165+
l_partkey Int64,
166+
l_suppkey Int64,
167+
l_linenumber Int64 NOT NULL,
168+
l_quantity Double,
169+
l_extendedprice Double,
170+
l_discount Double,
171+
l_tax Double,
172+
l_returnflag String,
173+
l_linestatus String,
174+
l_shipdate Date,
175+
l_commitdate Date,
176+
l_receiptdate Date,
177+
l_shipinstruct String,
178+
l_shipmode String,
179+
l_comment String
180+
) WITH (
181+
DATA_SOURCE="{self.external_source_path}",
182+
LOCATION="/h/s{self.scale}/parquet/lineitem/",
183+
FORMAT="parquet"
184+
);
185+
""")
186+
187+
def setup_datasink(self, output_path):
188+
logger.info(f"setupping detasink to `{output_path}`...")
189+
190+
access_key_id_name = "test_olap_s3_import_aws_access_key_id"
191+
access_key_secret_name = "test_olap_s3_import_aws_access_key_secret"
192+
self.query(f"""
193+
UPSERT OBJECT {access_key_id_name} (TYPE SECRET) WITH (value = "{self.sink_access_key_id}");
194+
UPSERT OBJECT {access_key_secret_name} (TYPE SECRET) WITH (value = "{self.sink_access_key_secret}");
195+
""", log_query=False)
196+
197+
self.query(f"""
198+
CREATE OR REPLACE EXTERNAL DATA SOURCE `{self.external_sink_path}` WITH (
199+
SOURCE_TYPE="ObjectStorage",
200+
LOCATION="{self.s3_url}/{self.s3_sink_bucket}/",
201+
AUTH_METHOD="AWS",
202+
AWS_ACCESS_KEY_ID_SECRET_NAME="{access_key_id_name}",
203+
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{access_key_secret_name}",
204+
AWS_REGION="ru-central-1"
205+
);
206+
207+
CREATE OR REPLACE EXTERNAL TABLE `{self.external_sink_table_path}` (
208+
l_orderkey Int64 NOT NULL,
209+
l_partkey Int64,
210+
l_suppkey Int64,
211+
l_linenumber Int64 NOT NULL,
212+
l_quantity Double,
213+
l_extendedprice Double,
214+
l_discount Double,
215+
l_tax Double,
216+
l_returnflag String,
217+
l_linestatus String,
218+
l_shipdate Date,
219+
l_commitdate Date,
220+
l_receiptdate Date,
221+
l_shipinstruct String,
222+
l_shipmode String,
223+
l_comment String
224+
) WITH (
225+
DATA_SOURCE="{self.external_sink_path}",
226+
LOCATION="{output_path}",
227+
FORMAT="parquet"
228+
);
229+
""")
230+
231+
def validate_tables(self, first_table, second_table, stage_name):
232+
with self.ReportTime(self.results, stage_name):
233+
logger.info(f"validating tables {first_table} and {second_table}...")
234+
result_sets = self.query(f"""
235+
SELECT
236+
String::Hex(Sum(Digest::MurMurHash32(Pickle(TableRow())))) AS first_hash,
237+
COUNT(*) AS first_size
238+
FROM `{first_table}`;
239+
240+
SELECT
241+
String::Hex(Sum(Digest::MurMurHash32(Pickle(TableRow())))) AS second_hash,
242+
COUNT(*) AS second_size
243+
FROM `{second_table}`;
244+
""")
245+
246+
assert len(result_sets) == 2
247+
248+
first_result = result_sets[0].rows
249+
assert len(first_result) == 1
250+
first_result = first_result[0]
251+
252+
second_result = result_sets[1].rows
253+
assert len(second_result) == 1
254+
second_result = second_result[0]
255+
256+
assert first_result.first_size == self.table_size
257+
assert first_result.first_size == second_result.second_size
258+
assert first_result.first_hash == second_result.second_hash
259+
260+
def run_import_from_s3(self):
261+
with self.ReportTime(self.results, "import"):
262+
self.cleanup_tables()
263+
self.query(f"""
264+
CREATE TABLE `{self.olap_table_path}` (
265+
PRIMARY KEY (l_orderkey, l_linenumber)
266+
) WITH (
267+
STORE = COLUMN
268+
) AS SELECT * FROM `{self.external_table_path}`;
269+
""", max_retries=0)
270+
271+
def run_export_to_s3(self):
272+
with self.ReportTime(self.results, "export"):
273+
self.query(f"""
274+
INSERT INTO `{self.external_sink_table_path}`
275+
SELECT * FROM `{self.olap_table_path}`;
276+
""", max_retries=0)
277+
278+
def clear_bucket(self, output_path):
279+
logger.info(f"cleaning up s3 by path `{output_path}`...")
280+
resource = boto3.resource(
281+
"s3",
282+
endpoint_url=self.s3_url,
283+
aws_access_key_id=self.sink_access_key_id,
284+
aws_secret_access_key=self.sink_access_key_secret
285+
)
286+
bucket = resource.Bucket(self.s3_sink_bucket)
287+
bucket.objects.filter(Prefix=output_path).delete()
288+
289+
def test_import_and_export(self):
290+
output_path = f"test_import/s{self.scale}/{datetime.datetime.now()}/"
291+
self.results.setup(f"test_import_and_export[scale={self.scale}]")
292+
293+
with self.ReportTime(self.results, "global"):
294+
self.setup_datasource()
295+
self.setup_datasink(output_path)
296+
297+
self.run_import_from_s3()
298+
self.validate_tables(self.external_table_path, self.olap_table_path, "validate_import")
299+
300+
self.run_export_to_s3()
301+
self.validate_tables(self.olap_table_path, self.external_sink_table_path, "validate_export")
302+
303+
self.cleanup_tables()
304+
self.clear_bucket(output_path)
305+
306+
self.results.report_finish()
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
PY3TEST()
2+
3+
TAG(ya:manual)
4+
5+
SIZE(LARGE)
6+
7+
TEST_SRCS (
8+
test_large_import.py
9+
)
10+
11+
PEERDIR (
12+
contrib/python/boto3
13+
ydb/public/sdk/python
14+
ydb/tests/olap/lib
15+
)
16+
17+
END()

ydb/tests/olap/s3_import/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,7 @@ DEPENDS(
3131
)
3232

3333
END()
34+
35+
RECURSE(
36+
large
37+
)

0 commit comments

Comments
 (0)