Skip to content

Commit 0d00345

Browse files
committed
parameters pushdown support prototype
1 parent 749b883 commit 0d00345

File tree

5 files changed

+124
-15
lines changed

5 files changed

+124
-15
lines changed

Diff for: ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,8 @@
679679
"Children": [
680680
{"Index": 0, "Name": "Type", "Type": "TExprBase"},
681681
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
682-
{"Index": 2, "Name": "Lambda", "Type": "TCoLambda"}
682+
{"Index": 2, "Name": "Parameters", "Type": "TExprList"},
683+
{"Index": 3, "Name": "Lambda", "Type": "TCoLambda"}
683684
]
684685
},
685686
{

Diff for: ydb/core/kqp/host/kqp_type_ann.cpp

+19-2
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ TStatus AnnotateOlapFilter(const TExprNode::TPtr& node, TExprContext& ctx) {
10771077
}
10781078

10791079
TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
1080-
if (!EnsureArgsCount(*node, 3U, ctx)) {
1080+
if (!EnsureArgsCount(*node, 4U, ctx)) {
10811081
return TStatus::Error;
10821082
}
10831083

@@ -1097,7 +1097,8 @@ TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
10971097
}
10981098

10991099
const auto structType = argsType->Cast<TStructExprType>();
1100-
TTypeAnnotationNode::TListType argsTypes(columns->ChildrenSize());
1100+
std::vector<const NYql::TTypeAnnotationNode*> argsTypes(columns->ChildrenSize());
1101+
11011102
for (auto i = 0U; i < argsTypes.size(); ++i) {
11021103
if (const auto argType = structType->FindItemType(columns->Child(i)->Content()))
11031104
argsTypes[i] = argType;
@@ -1109,6 +1110,22 @@ TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
11091110
}
11101111
}
11111112

1113+
TExprList parameters = TExprList(node->Child(TKqpOlapApply::idx_Parameters));
1114+
1115+
for(auto expr: parameters) {
1116+
if (!EnsureArgsCount(*expr.Ptr(), 2U, ctx)) {
1117+
return TStatus::Error;
1118+
}
1119+
1120+
TCoParameter param = TMaybeNode<TCoParameter>(expr.Ptr()).Cast();
1121+
const auto& paramType = expr.Ptr()->Child(TCoParameter::idx_Type);
1122+
if (!EnsureType(*paramType, ctx)) {
1123+
return TStatus::Error;
1124+
}
1125+
1126+
argsTypes.push_back(paramType->GetTypeAnn()->Cast<TTypeExprType>()->GetType());
1127+
}
1128+
11121129
if (!EnsureLambda(node->Tail(), ctx)) {
11131130
return TStatus::Error;
11141131
}

Diff for: ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

+13-9
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,6 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
214214
return false;
215215
});
216216

217-
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22216
218-
if (parameters.size()!=0) {
219-
return nullptr;
220-
}
221-
222217
const auto members = FindNodes(apply.Ptr(), [&argument] (const TExprNode::TPtr& node) {
223218
if (const auto maybeMember = TMaybeNode<TCoMember>(node))
224219
return maybeMember.Cast().Struct().Raw() == &argument;
@@ -231,10 +226,18 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
231226
arguments.reserve(members.size());
232227
for (const auto& member : members) {
233228
columns.emplace_back(member->TailPtr());
234-
arguments.emplace_back(ctx.NewArgument(member->Pos(), columns.back()->Content()));
229+
TString argumentName = "members_" + TString(columns.back()->Content());
230+
arguments.emplace_back(ctx.NewArgument(member->Pos(), TStringBuf(argumentName)));
235231
replacements.emplace(member.Get(), arguments.back());
236232
}
237233

234+
for(const auto& pptr : parameters) {
235+
TCoParameter parameter = TMaybeNode<TCoParameter>(pptr).Cast();
236+
TString argumentName = "parameter_" + TString(parameter.Name().StringValue());
237+
arguments.emplace_back(ctx.NewArgument(pptr->Pos(), TStringBuf(argumentName)));
238+
replacements.emplace(pptr.Get(), arguments.back());
239+
}
240+
238241
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
239242
if (!columns.size()) {
240243
return nullptr;
@@ -243,6 +246,7 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
243246
return Build<TKqpOlapApply>(ctx, apply.Pos())
244247
.Type(ExpandType(argument.Pos(), *argument.GetTypeAnn(), ctx))
245248
.Columns().Add(std::move(columns)).Build()
249+
.Parameters().Add(std::move(parameters)).Build()
246250
.Lambda(ctx.NewLambda(apply.Pos(), ctx.NewArguments(argument.Pos(), std::move(arguments)), ctx.ReplaceNodes(apply.Ptr(), replacements)))
247251
.Done();
248252
}
@@ -401,7 +405,7 @@ std::vector<TExprBase> ConvertComparisonNode(const TExprBase& nodeIn, const TExp
401405
return SafeCastPredicatePushdown(maybeFlatmap.Cast(), argument, ctx, pos);
402406
} else if (auto maybePredicate = node.Maybe<TCoCompare>()) {
403407
return SimplePredicatePushdown(maybePredicate.Cast(), argument, ctx, pos);
404-
}
408+
}
405409

406410
if constexpr (NKikimr::NSsa::RuntimeVersion >= 5U) {
407411
return YqlApplyPushdown(node, argument, ctx);
@@ -800,7 +804,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
800804
}
801805
remaining = std::move(remainingAfterApply);
802806
}
803-
807+
804808
if (pushedPredicates.empty()) {
805809
return node;
806810
}
@@ -810,7 +814,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
810814
const auto remainingFilter = CombinePredicatesWithAnd(remaining, ctx, node.Pos(), false, true);
811815

812816
TMaybeNode<TExprBase> olapFilter;
813-
if (pushedFilter.FirstLevelOps.IsValid()) {
817+
if (pushedFilter.FirstLevelOps.IsValid()) {
814818
olapFilter = Build<TKqpOlapFilter>(ctx, node.Pos())
815819
.Input(read.Process().Body())
816820
.Condition(pushedFilter.FirstLevelOps.Cast())

Diff for: ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,12 @@ TTypedColumn CompileYqlKernelScalarApply(const TKqpOlapApply& apply, TKqpOlapCom
600600
argTypes.emplace_back(arg.Type);
601601
}
602602

603+
for(const auto& param: apply.Parameters()) {
604+
const auto& arg = GetOrCreateColumnIdAndType(param, ctx);
605+
ids.emplace_back(arg.Id);
606+
argTypes.emplace_back(arg.Type);
607+
}
608+
603609
auto *const command = ctx.CreateAssignCmd();
604610
auto *const function = command->MutableFunction();
605611
const auto idx = ctx.GetKernelRequestBuilder().AddScalarApply(apply.Lambda().Ref(), argTypes, ctx.ExprCtx());

Diff for: ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

+84-3
Original file line numberDiff line numberDiff line change
@@ -1458,7 +1458,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
14581458
//This check is based on an assumpltion, that for pushed down predicates column names are preserved in AST
14591459
//But for non-pushed down predicates column names are (usually) replaced with a label, started with $. It' not a rule, but a heuristic
14601460
//So this check may require a correction when some ast optimization rules are changed
1461-
UNIT_ASSERT_C(ast.find(R"((Unwrap (/ $)") != std::string::npos,
1461+
UNIT_ASSERT_C(ast.find(R"((Unwrap (/ $)") != std::string::npos,
14621462
TStringBuilder() << "Unsafe subpredicate is pushed down. Query: " << query);
14631463

14641464
UNIT_ASSERT_C(ast.find("NarrowMap") != std::string::npos,
@@ -1776,6 +1776,87 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
17761776
UNIT_ASSERT_VALUES_EQUAL(result, insertRows);
17771777
}
17781778

1779+
Y_UNIT_TEST(PredicatePushdownWithParametersILike) {
1780+
constexpr bool logQueries = true;
1781+
auto settings = TKikimrSettings()
1782+
.SetWithSampleTables(false);
1783+
TKikimrRunner kikimr(settings);
1784+
1785+
TStreamExecScanQuerySettings scanSettings;
1786+
scanSettings.Explain(true);
1787+
1788+
TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable();
1789+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
1790+
1791+
auto tableClient = kikimr.GetTableClient();
1792+
1793+
auto buildQuery = [](bool pushEnabled) {
1794+
TStringBuilder builder;
1795+
1796+
builder << "--!syntax_v1" << Endl;
1797+
1798+
if (!pushEnabled) {
1799+
builder << "PRAGMA Kikimr.OptEnableOlapPushdown = \"false\";" << Endl;
1800+
}
1801+
1802+
builder << R"(
1803+
DECLARE $in_uid AS Utf8;
1804+
DECLARE $in_level AS Int32;
1805+
1806+
SELECT `timestamp` FROM `/Root/olapStore/olapTable` WHERE
1807+
uid ILIKE "uid_%" || $in_uid || "%" AND level > $in_level
1808+
ORDER BY `timestamp`;
1809+
)" << Endl;
1810+
1811+
return builder;
1812+
};
1813+
1814+
auto normalQuery = buildQuery(false);
1815+
auto pushQuery = buildQuery(true);
1816+
1817+
auto params = tableClient.GetParamsBuilder()
1818+
.AddParam("$in_uid")
1819+
.Utf8("3000")
1820+
.Build()
1821+
.AddParam("$in_level")
1822+
.Int32(2)
1823+
.Build()
1824+
.Build();
1825+
1826+
auto it = tableClient.StreamExecuteScanQuery(normalQuery, params).GetValueSync();
1827+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
1828+
auto goodResult = CollectStreamResult(it);
1829+
1830+
it = tableClient.StreamExecuteScanQuery(pushQuery, params).GetValueSync();
1831+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
1832+
auto pushResult = CollectStreamResult(it);
1833+
1834+
if (logQueries) {
1835+
Cerr << "Query: " << normalQuery << Endl;
1836+
Cerr << "Expected: " << goodResult.ResultSetYson << Endl;
1837+
Cerr << "Received: " << pushResult.ResultSetYson << Endl;
1838+
}
1839+
1840+
CompareYson(goodResult.ResultSetYson, pushResult.ResultSetYson);
1841+
1842+
it = tableClient.StreamExecuteScanQuery(pushQuery, scanSettings).GetValueSync();
1843+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
1844+
1845+
auto result = CollectStreamResult(it);
1846+
auto ast = result.QueryStats->Getquery_ast();
1847+
1848+
UNIT_ASSERT_C(ast.find("KqpOlapFilter") != std::string::npos,
1849+
TStringBuilder() << "Predicate not pushed down. Query: " << pushQuery);
1850+
1851+
NJson::TJsonValue plan, readRange;
1852+
NJson::ReadJsonTree(*result.PlanJson, &plan, true);
1853+
1854+
Cerr << result.PlanJson << Endl;
1855+
1856+
readRange = FindPlanNodeByKv(plan, "Name", "TableFullScan");
1857+
UNIT_ASSERT(readRange.IsDefined());
1858+
}
1859+
17791860
Y_UNIT_TEST(PredicatePushdownWithParameters) {
17801861
constexpr bool logQueries = true;
17811862
auto settings = TKikimrSettings()
@@ -3277,9 +3358,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
32773358
32783359
)", noTx).GetValueSync();
32793360
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3280-
3361+
32813362
result = queryClient.ExecuteQuery(R"(
3282-
UPSERT INTO Test (Id, Name, Comment) VALUES
3363+
UPSERT INTO Test (Id, Name, Comment) VALUES
32833364
(10, "n1", "aa"),
32843365
(20, "n2", "bb"),
32853366
(30, "n3", "cc"),

0 commit comments

Comments
 (0)