diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index b4d244033d3b..7d03ed5ea214 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -100,6 +100,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, MaxDPHypDPTableSize); REGISTER_SETTING(*this, MaxTasksPerStage); + REGISTER_SETTING(*this, DataSizePerPartition); REGISTER_SETTING(*this, MaxSequentialReadsInFlight); REGISTER_SETTING(*this, KMeansTreeSearchTopSize); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 74d0a34720fc..86d7e117c3b3 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -75,8 +75,8 @@ struct TKikimrSettings { NCommon::TConfSetting MaxDPHypDPTableSize; - NCommon::TConfSetting MaxTasksPerStage; + NCommon::TConfSetting DataSizePerPartition; NCommon::TConfSetting MaxSequentialReadsInFlight; NCommon::TConfSetting KMeansTreeSearchTopSize; diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 2593f3e54471..c387ea317705 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1106,7 +1106,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { IDqIntegration::TPartitionSettings pSettings; pSettings.MaxPartitions = maxTasksPerStage; pSettings.CanFallback = false; - pSettings.DataSizePerJob = NYql::TDqSettings::TDefault::DataSizePerJob; + pSettings.DataSizePerJob = Config->DataSizePerPartition.Get().GetOrElse(NYql::TDqSettings::TDefault::DataSizePerJob); dqIntegration->Partition(*source, partitionParams, &clusterName, ctx, pSettings); externalSource.SetTaskParamKey(TString(dataSourceCategory)); for (const TString& partitionParam : partitionParams) { diff --git a/ydb/tests/fq/solomon/test.py b/ydb/tests/fq/solomon/test.py index e61f1bb4d38a..3290135782bf 100644 --- a/ydb/tests/fq/solomon/test.py +++ b/ydb/tests/fq/solomon/test.py @@ -63,7 +63,7 @@ def test(suite, case, cfg, solomon): kqprun = KqpRun(config_file=os.path.join('ydb/tests/fq/solomon/cfg', 'kqprun_config.conf'), scheme_file=os.path.join('ydb/tests/fq/solomon/cfg', 'kqprun_scheme.sql')) yqlrun_res = kqprun.yql_exec( - program=sql_query, + yql_program=sql_query, var_templates=['SOLOMON_ENDPOINT', 'SOLOMON_PORT'], verbose=True, check_error=not xfail diff --git a/ydb/tests/fq/tools/kqprun.py b/ydb/tests/fq/tools/kqprun.py index a582de81a9a8..18ab7ccadf65 100644 --- a/ydb/tests/fq/tools/kqprun.py +++ b/ydb/tests/fq/tools/kqprun.py @@ -1,4 +1,5 @@ import os +from typing import Optional, List import pytest import yatest.common @@ -7,30 +8,54 @@ class KqpRun(object): - def __init__(self, config_file, scheme_file, udfs_dir=None): - self.kqprun_binary = yql_utils.yql_binary_path('ydb/tests/tools/kqprun/kqprun') + def __init__(self, config_file: str, scheme_file: str, udfs_dir: Optional[str] = None, path_prefix: str = ""): + self.kqprun_binary: str = yql_utils.yql_binary_path('ydb/tests/tools/kqprun/kqprun') - self.config_file = yql_utils.yql_source_path(config_file) - self.scheme_file = yql_utils.yql_source_path(scheme_file) + self.config_file: str = yql_utils.yql_source_path(config_file) + self.scheme_file: str = yql_utils.yql_source_path(scheme_file) - self.res_dir = yql_utils.get_yql_dir(prefix='kqprun_') + self.res_dir: str = yql_utils.get_yql_dir(prefix=f'{path_prefix}kqprun_') if udfs_dir is None: - self.udfs_dir = yql_utils.get_udfs_path() + self.udfs_dir: str = yql_utils.get_udfs_path() else: - self.udfs_dir = udfs_dir + self.udfs_dir: str = udfs_dir - def __res_file_path(self, name): + self.tables: List[str] = [] + self.queries: List[str] = [] + + def __res_file_path(self, name: str) -> str: return os.path.join(self.res_dir, name) - def yql_exec(self, program=None, program_file=None, verbose=False, check_error=True, var_templates=None, tables=None): + def add_table(self, name: str, content: List[str], attrs: Optional[str] = None): + table_path = self.__res_file_path(f'table_{len(self.tables)}.yson') + with open(table_path, 'w') as table: + for row in content: + table.write(f'{row}\n') + + if attrs is not None: + with open(f'{table_path}.attr', 'w') as table_attrs: + table_attrs.write(attrs) + + self.tables.append(f'yt./Root/plato.{name}@{table_path}') + + def add_query(self, sql: str): + query_path = self.__res_file_path(f'query_{len(self.queries)}.sql') + with open(query_path, 'w') as query: + query.write(sql) + + self.queries.append(query_path) + + def yql_exec(self, verbose: bool = False, check_error: bool = True, var_templates: Optional[List[str]] = None, + yql_program: Optional[str] = None, yql_tables: List[yql_utils.Table] = []) -> yql_utils.YQLExecResult: udfs_dir = self.udfs_dir config_file = self.config_file - program_file = yql_utils.prepare_program(program, program_file, self.res_dir, ext='sql')[1] scheme_file = self.scheme_file results_file = self.__res_file_path('results.txt') + ast_file = self.__res_file_path('ast.txt') + plan_file = self.__res_file_path('plan.json') log_file = self.__res_file_path('log.txt') cmd = self.kqprun_binary + ' ' @@ -39,46 +64,60 @@ def yql_exec(self, program=None, program_file=None, verbose=False, check_error=T '--emulate-yt ' '--exclude-linked-udfs ' '--execution-case query ' - '--app-config=%(config_file)s ' - '--script-query=%(program_file)s ' - '--scheme-query=%(scheme_file)s ' - '--result-file=%(results_file)s ' - '--log-file=%(log_file)s ' - '--udfs-dir=%(udfs_dir)s ' + f'--app-config={config_file} ' + f'--scheme-query={scheme_file} ' + f'--result-file={results_file} ' + f'--script-ast-file={ast_file} ' + f'--script-plan-file={plan_file} ' + f'--log-file={log_file} ' + f'--udfs-dir={udfs_dir} ' '--result-format full-proto ' - '--result-rows-limit 0 ' % locals() + '--plan-format json ' + '--result-rows-limit 0 ' ) if var_templates is not None: for var_template in var_templates: - cmd += '--var-template %s ' % var_template + cmd += f'--var-template {var_template} ' + + for query in self.queries: + cmd += f'--script-query={query} ' + + if yql_program is not None: + program_file = yql_utils.prepare_program(yql_program, None, self.res_dir, ext='sql')[1] + cmd += f'--script-query={program_file} ' - if tables is not None: - for table in tables: - if table.format != 'yson': - pytest.skip('skip tests containing tables with a non-yson attribute format') - cmd += '--table=yt./Root/%s@%s ' % (table.full_name, table.yqlrun_file) + for table in self.tables: + cmd += f'--table={table} ' + + for table in yql_tables: + if table.format != 'yson': + pytest.skip('skip tests containing tables with a non-yson attribute format') + cmd += f'--table=yt./Root/{table.full_name}@{table.yqlrun_file} ' proc_result = yatest.common.process.execute(cmd.strip().split(), check_exit_code=False, cwd=self.res_dir) if proc_result.exit_code != 0 and check_error: - assert 0, ( - 'Command\n%(command)s\n finished with exit code %(code)d, stderr:\n\n%(stderr)s\n\nlog file:\n%(log_file)s' - % { - 'command': cmd, - 'code': proc_result.exit_code, - 'stderr': proc_result.std_err, - 'log_file': yql_utils.read_res_file(log_file)[1], - } - ) + assert 0, f'Command\n{cmd}\n finished with exit code {proc_result.exit_code}, stderr:\n\n{proc_result.std_err}\n\nlog file:\n{yql_utils.read_res_file(log_file)[1]}' results, log_results = yql_utils.read_res_file(results_file) + ast, log_ast = yql_utils.read_res_file(ast_file) + plan, log_plan = yql_utils.read_res_file(plan_file) err, log_err = yql_utils.read_res_file(log_file) if verbose: - yql_utils.log('PROGRAM:') - yql_utils.log(program) + yql_utils.log('QUERIES:') + if yql_program is not None: + yql_utils.log(yql_program) + + for query in self.queries: + yql_utils.log(yql_program) + yql_utils.log('RESULTS:') yql_utils.log(log_results) + yql_utils.log('AST:') + yql_utils.log(log_ast) + yql_utils.log('PLAN:') + yql_utils.log(log_plan) yql_utils.log('ERROR:') yql_utils.log(log_err) @@ -87,11 +126,11 @@ def yql_exec(self, program=None, program_file=None, verbose=False, check_error=T proc_result.std_err, results, results_file, - None, - None, - None, - None, - program, + ast, + ast_file, + plan, + plan_file, + yql_program, proc_result, None, ) diff --git a/ydb/tests/fq/yt/cfg/kqprun_config.conf b/ydb/tests/fq/yt/cfg/kqprun_config.conf index ef5d64eec832..07d5947a3d35 100644 --- a/ydb/tests/fq/yt/cfg/kqprun_config.conf +++ b/ydb/tests/fq/yt/cfg/kqprun_config.conf @@ -5,9 +5,9 @@ FeatureFlags { } QueryServiceConfig { - AvailableExternalDataSources: "YT" - - FileStorage { + AvailableExternalDataSources: "YT" + + FileStorage { MaxFiles: 1000 MaxSizeMb: 512 RetryCount: 3 diff --git a/ydb/tests/fq/yt/kqp_yt_file.py b/ydb/tests/fq/yt/kqp_yt_file.py index 94d8484b085d..a6796502d09f 100644 --- a/ydb/tests/fq/yt/kqp_yt_file.py +++ b/ydb/tests/fq/yt/kqp_yt_file.py @@ -208,7 +208,7 @@ def run_file_kqp_no_cache(suite, case, cfg): scheme_file=os.path.join('ydb/tests/fq/yt/cfg', 'kqprun_scheme.sql'), udfs_dir=yql_binary_path('yql/essentials/tests/common/test_framework/udfs_deps')) - return kqprun.yql_exec(program=sql_query, verbose=True, check_error=True, tables=in_tables) + return kqprun.yql_exec(yql_program=sql_query, verbose=True, check_error=True, yql_tables=in_tables) def run_file_kqp(suite, case, cfg): diff --git a/ydb/tests/fq/yt/kqp_yt_import/conftest.py b/ydb/tests/fq/yt/kqp_yt_import/conftest.py new file mode 100644 index 000000000000..de9b4018dd23 --- /dev/null +++ b/ydb/tests/fq/yt/kqp_yt_import/conftest.py @@ -0,0 +1,13 @@ +import os +import pytest + +from ydb.tests.fq.tools.kqprun import KqpRun + + +@pytest.fixture +def kqp_run(request) -> KqpRun: + return KqpRun( + config_file=os.path.join('ydb/tests/fq/yt/kqp_yt_import', 'kqprun_import_config.conf'), + scheme_file=os.path.join('ydb/tests/fq/yt/cfg', 'kqprun_scheme.sql'), + path_prefix=f"{request.function.__name__}_" + ) diff --git a/ydb/tests/fq/yt/kqp_yt_import/helpers.py b/ydb/tests/fq/yt/kqp_yt_import/helpers.py new file mode 100644 index 000000000000..60cba83b7f10 --- /dev/null +++ b/ydb/tests/fq/yt/kqp_yt_import/helpers.py @@ -0,0 +1,65 @@ +from typing import Optional + +import google.protobuf.text_format as proto +import ydb.public.api.protos.ydb_value_pb2 as ydb + +from ydb.tests.fq.tools.kqprun import KqpRun + + +ValueByTypeExtractors = { + ydb.Type.PrimitiveTypeId.INT64: lambda x: x.int64_value, + ydb.Type.PrimitiveTypeId.STRING: lambda x: x.bytes_value, +} + + +def add_sample_table(kqp_run: KqpRun, table_name: str = 'input', infer_schema: bool = True): + attrs: Optional[str] = None + if not infer_schema: + attrs = """ + {"_yql_row_spec" = { + "Type" = ["StructType"; [ + ["key"; ["DataType"; "String"]]; + ["subkey"; ["DataType"; "Int64"]]; + ["value"; ["DataType"; "String"]]; + ]] + }} + """ + + kqp_run.add_table(table_name, [ + '{"key"="075";"subkey"=1;"value"="abc"};', + '{"key"="800";"subkey"=2;"value"="ddd"};', + '{"key"="020";"subkey"=3;"value"="q"};', + '{"key"="150";"subkey"=4;"value"="qzz"};' + ], attrs) + + +def validate_sample_result(result: str): + result_set = ydb.ResultSet() + proto.Parse(result, result_set) + + columns = [ + ('key', ydb.Type.PrimitiveTypeId.STRING), + ('subkey', ydb.Type.PrimitiveTypeId.INT64), + ('value', ydb.Type.PrimitiveTypeId.STRING) + ] + + assert len(result_set.columns) == len(columns) + for i, (column_name, column_type_id) in enumerate(columns): + assert result_set.columns[i].name == column_name + + result_column_type = result_set.columns[i].type.type_id + assert result_column_type == column_type_id, f'{result_column_type} != {column_type_id}' + + rows = [ + (b'075', 1, b'abc'), + (b'800', 2, b'ddd'), + (b'020', 3, b'q'), + (b'150', 4, b'qzz') + ] + + assert len(result_set.rows) == len(rows) + for i, row in enumerate(rows): + for j, expected_value in enumerate(row): + value_extractor = ValueByTypeExtractors[result_set.columns[j].type.type_id] + result_value = value_extractor(result_set.rows[i].items[j]) + assert result_value == expected_value, f'{result_value} != {expected_value}' diff --git a/ydb/tests/fq/yt/kqp_yt_import/kqprun_import_config.conf b/ydb/tests/fq/yt/kqp_yt_import/kqprun_import_config.conf new file mode 100644 index 000000000000..75f9e3dae42f --- /dev/null +++ b/ydb/tests/fq/yt/kqp_yt_import/kqprun_import_config.conf @@ -0,0 +1,42 @@ +ColumnShardConfig { + DisabledOnSchemeShard: false +} + +FeatureFlags { + EnableExternalDataSources: true + EnableScriptExecutionOperations: true + EnablePgSyntax: true +} + +QueryServiceConfig { + AvailableExternalDataSources: "YT" + + FileStorage { + MaxFiles: 1000 + MaxSizeMb: 512 + RetryCount: 3 + Threads: 2 + } + + Yt { + DefaultSettings { + Name: "InferSchema" + Value: "1" + } + DefaultSettings { + Name: "UseRPCReaderInDQ" + Value: "true" + } + } +} + +TableServiceConfig { + BlockChannelsMode: BLOCK_CHANNELS_FORCE + EnableCreateTableAs: true + EnableOlapSink: true + EnablePerStatementQueryExecution: true + + WriteActorSettings { + MaxWriteAttempts: 1000 + } +} diff --git a/ydb/tests/fq/yt/kqp_yt_import/test_ctas.py b/ydb/tests/fq/yt/kqp_yt_import/test_ctas.py new file mode 100644 index 000000000000..86e252b23478 --- /dev/null +++ b/ydb/tests/fq/yt/kqp_yt_import/test_ctas.py @@ -0,0 +1,26 @@ +from ydb.tests.fq.tools.kqprun import KqpRun +from ydb.tests.fq.yt.kqp_yt_import.helpers import add_sample_table, validate_sample_result + + +class TestYtCtas: + def test_simple_ctast(self, kqp_run: KqpRun): + add_sample_table(kqp_run) + + kqp_run.add_query(""" + CREATE TABLE from_yt ( + PRIMARY KEY (key) + ) WITH ( + STORE = COLUMN + ) + AS SELECT UNWRAP(key) AS key, subkey, value FROM plato.input + """) + + kqp_run.add_query(""" + SELECT + UNWRAP(key) AS key, UNWRAP(subkey) AS subkey, UNWRAP(value) AS value + FROM from_yt + ORDER BY subkey + """) + + result = kqp_run.yql_exec(verbose=True) + validate_sample_result(result.results) diff --git a/ydb/tests/fq/yt/kqp_yt_import/test_yt_reading.py b/ydb/tests/fq/yt/kqp_yt_import/test_yt_reading.py new file mode 100644 index 000000000000..8864151a6bfc --- /dev/null +++ b/ydb/tests/fq/yt/kqp_yt_import/test_yt_reading.py @@ -0,0 +1,39 @@ +import json + +from ydb.tests.fq.tools.kqprun import KqpRun +from ydb.tests.fq.yt.kqp_yt_import.helpers import add_sample_table, validate_sample_result + + +class TestYtReading: + def test_partitioned_reading(self, kqp_run: KqpRun): + add_sample_table(kqp_run) + + kqp_run.add_query(""" + PRAGMA ydb.DataSizePerPartition = "1"; + + SELECT * FROM plato.input + """) + + result = kqp_run.yql_exec(verbose=True) + + plan = json.loads(result.plan) + assert plan['Plan']['Plans'][0]['Plans'][0]['Plans'][0]['Plans'][0]['Stats']['Tasks'] == 2 + + def test_block_reading(self, kqp_run: KqpRun): + add_sample_table(kqp_run, infer_schema=False) + + kqp_run.add_query(""" + PRAGMA UseBlocks; + PRAGMA BlockEngine = "force"; + PRAGMA ydb.UseBlockReader = "true"; + + SELECT + key, subkey, value + FROM plato.input + ORDER BY subkey + """) + + result = kqp_run.yql_exec(verbose=True) + validate_sample_result(result.results) + + assert "DqReadBlockWideWrap" in result.opt, result.opt diff --git a/ydb/tests/fq/yt/kqp_yt_import/ya.make b/ydb/tests/fq/yt/kqp_yt_import/ya.make new file mode 100644 index 000000000000..4cd8089b953d --- /dev/null +++ b/ydb/tests/fq/yt/kqp_yt_import/ya.make @@ -0,0 +1,30 @@ +PY3TEST() + +TEST_SRCS( + test_ctas.py + test_yt_reading.py +) + +PY_SRCS( + conftest.py + helpers.py +) + +SIZE(MEDIUM) + +DEPENDS( + ydb/tests/tools/kqprun +) + +DATA( + arcadia/ydb/tests/fq/yt/cfg + arcadia/ydb/tests/fq/yt/kqp_yt_import +) + +PEERDIR( + ydb/public/api/protos + ydb/tests/fq/tools + yql/essentials/tests/common/test_framework +) + +END() diff --git a/ydb/tests/fq/yt/ya.make b/ydb/tests/fq/yt/ya.make index d89c0e0d2779..4c40deaf514a 100644 --- a/ydb/tests/fq/yt/ya.make +++ b/ydb/tests/fq/yt/ya.make @@ -1,3 +1,4 @@ RECURSE( kqp_yt_file + kqp_yt_import ) diff --git a/ydb/tests/tools/kqprun/.gitignore b/ydb/tests/tools/kqprun/.gitignore index 86fc65c21b31..d94be3dd52b3 100644 --- a/ydb/tests/tools/kqprun/.gitignore +++ b/ydb/tests/tools/kqprun/.gitignore @@ -3,10 +3,11 @@ sync_dir example udfs -*.log +*.attr +*.bin *.json +*.log +*.old *.sql -*.bin -*.txt *.svg -*.old +*.txt diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index b5cfb03681f3..d9c50ceddb45 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -168,6 +168,10 @@ QueryServiceConfig { Name: "_EnableYtPartitioning" Value: "true" } + DefaultSettings { + Name: "UseRPCReaderInDQ" + Value: "true" + } } Solomon {