Skip to content

Commit 6f687c3

Browse files
authored
Sync EvaluteExpr execution (#11801) (#14453)
1 parent 7088281 commit 6f687c3

File tree

5 files changed

+135
-103
lines changed

5 files changed

+135
-103
lines changed

ydb/core/kqp/gateway/kqp_gateway.h

+3
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ class IKqpGateway : public NYql::IKikimrGateway {
200200
using NYql::IKikimrGateway::ExecuteLiteral;
201201
virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request,
202202
TQueryData::TPtr params, ui32 txIndex) = 0;
203+
using NYql::IKikimrGateway::ExecuteLiteralInstant;
204+
virtual TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request,
205+
TQueryData::TPtr params, ui32 txIndex) = 0;
203206

204207
/* Scripting */
205208
virtual NThreading::TFuture<TQueryResult> ExplainDataQueryAst(const TString& cluster, const TString& query) = 0;

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

+105-75
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,79 @@ struct TAppConfigResult : public IKqpGateway::TGenericResult {
7777
std::shared_ptr<const NKikimrConfig::TAppConfig> Config;
7878
};
7979

80+
bool ContainOnlyLiteralStages(NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest& request) {
81+
for (const auto& tx : request.Transactions) {
82+
if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
83+
return false;
84+
}
85+
86+
for (const auto& stage : tx.Body->GetStages()) {
87+
if (stage.InputsSize() != 0) {
88+
return false;
89+
}
90+
}
91+
}
92+
93+
return true;
94+
}
95+
96+
void PrepareLiteralRequest(IKqpGateway::TExecPhysicalRequest& literalRequest, NKqpProto::TKqpPhyQuery& phyQuery, const TString& program, const NKikimrMiniKQL::TType& resultType) {
97+
literalRequest.NeedTxId = false;
98+
literalRequest.MaxAffectedShards = 0;
99+
literalRequest.TotalReadSizeLimitBytes = 0;
100+
literalRequest.MkqlMemoryLimit = 100_MB;
101+
102+
auto& transaction = *phyQuery.AddTransactions();
103+
transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
104+
105+
auto& stage = *transaction.AddStages();
106+
auto& stageProgram = *stage.MutableProgram();
107+
stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
108+
stageProgram.SetRaw(program);
109+
stage.SetOutputsCount(1);
110+
111+
auto& taskResult = *transaction.AddResults();
112+
*taskResult.MutableItemType() = resultType;
113+
auto& taskConnection = *taskResult.MutableConnection();
114+
taskConnection.SetStageIndex(0);
115+
}
116+
117+
void FillLiteralResult(const IKqpGateway::TExecPhysicalResult& result, IKqpGateway::TExecuteLiteralResult& literalResult) {
118+
if (result.Success()) {
119+
YQL_ENSURE(result.Results.size() == 1);
120+
literalResult.SetSuccess();
121+
literalResult.Result = result.Results[0];
122+
} else {
123+
literalResult.SetStatus(result.Status());
124+
literalResult.AddIssues(result.Issues());
125+
}
126+
}
127+
128+
void FillPhysicalResult(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev, IKqpGateway::TExecPhysicalResult& result, TQueryData::TPtr params, ui32 txIndex) {
129+
auto& response = *ev->Record.MutableResponse();
130+
if (response.GetStatus() == Ydb::StatusIds::SUCCESS) {
131+
result.SetSuccess();
132+
result.ExecuterResult.Swap(response.MutableResult());
133+
{
134+
auto g = params->TypeEnv().BindAllocator();
135+
136+
auto& txResults = ev->GetTxResults();
137+
result.Results.reserve(txResults.size());
138+
for(auto& tx : txResults) {
139+
result.Results.emplace_back(tx.GetMkql());
140+
}
141+
params->AddTxHolders(std::move(ev->GetTxHolders()));
142+
143+
if (!txResults.empty()) {
144+
params->AddTxResults(txIndex, std::move(txResults));
145+
}
146+
}
147+
} else {
148+
for (auto& issue : response.GetIssues()) {
149+
result.AddIssue(NYql::IssueFromMessage(issue));
150+
}
151+
}
152+
}
80153

81154
template<typename TRequest, typename TResponse, typename TResult>
82155
class TProxyRequestHandler: public TRequestHandlerBase<
@@ -621,32 +694,8 @@ class TKqpExecLiteralRequestHandler: public TActorBootstrapped<TKqpExecLiteralRe
621694
}
622695

623696
void ProcessPureExecution(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev) {
624-
auto* response = ev->Record.MutableResponse();
625-
626697
TResult result;
627-
if (response->GetStatus() == Ydb::StatusIds::SUCCESS) {
628-
result.SetSuccess();
629-
result.ExecuterResult.Swap(response->MutableResult());
630-
{
631-
auto g = Parameters->TypeEnv().BindAllocator();
632-
633-
auto& txResults = ev->GetTxResults();
634-
result.Results.reserve(txResults.size());
635-
for(auto& tx : txResults) {
636-
result.Results.emplace_back(tx.GetMkql());
637-
}
638-
Parameters->AddTxHolders(std::move(ev->GetTxHolders()));
639-
640-
if (!txResults.empty()) {
641-
Parameters->AddTxResults(TxIndex, std::move(txResults));
642-
}
643-
}
644-
} else {
645-
for (auto& issue : response->GetIssues()) {
646-
result.AddIssue(NYql::IssueFromMessage(issue));
647-
}
648-
}
649-
698+
FillPhysicalResult(ev, result, Parameters, TxIndex);
650699
Promise.SetValue(std::move(result));
651700
this->PassAway();
652701
}
@@ -1798,79 +1847,60 @@ class TKikimrIcGateway : public IKqpGateway {
17981847
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
17991848
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
18001849
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
1801-
1802-
literalRequest.NeedTxId = false;
1803-
literalRequest.MaxAffectedShards = 0;
1804-
literalRequest.TotalReadSizeLimitBytes = 0;
1805-
literalRequest.MkqlMemoryLimit = 100_MB;
1806-
1807-
auto& transaction = *phyQuery.AddTransactions();
1808-
transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
1809-
1810-
auto& stage = *transaction.AddStages();
1811-
auto& stageProgram = *stage.MutableProgram();
1812-
stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
1813-
stageProgram.SetRaw(program);
1814-
stage.SetOutputsCount(1);
1815-
1816-
auto& taskResult = *transaction.AddResults();
1817-
*taskResult.MutableItemType() = resultType;
1818-
auto& taskConnection = *taskResult.MutableConnection();
1819-
taskConnection.SetStageIndex(0);
1850+
PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);
18201851

18211852
NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
1822-
18231853
NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
1824-
18251854
literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
18261855

18271856
return ExecuteLiteral(std::move(literalRequest), params, 0).Apply([](const auto& future) {
18281857
const auto& result = future.GetValue();
1829-
18301858
TExecuteLiteralResult literalResult;
1831-
1832-
if (result.Success()) {
1833-
YQL_ENSURE(result.Results.size() == 1);
1834-
literalResult.SetSuccess();
1835-
literalResult.Result = result.Results[0];
1836-
} else {
1837-
literalResult.SetStatus(result.Status());
1838-
literalResult.AddIssues(result.Issues());
1839-
}
1840-
1859+
FillLiteralResult(result, literalResult);
18411860
return literalResult;
18421861
});
18431862
}
18441863

1864+
TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override {
1865+
auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
1866+
auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
1867+
NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
1868+
PrepareLiteralRequest(literalRequest, phyQuery, program, resultType);
1869+
1870+
NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
1871+
NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
1872+
literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
1873+
1874+
auto result = ExecuteLiteralInstant(std::move(literalRequest), params, 0);
1875+
1876+
TExecuteLiteralResult literalResult;
1877+
FillLiteralResult(result, literalResult);
1878+
return literalResult;
1879+
}
18451880

18461881
TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
18471882
YQL_ENSURE(!request.Transactions.empty());
18481883
YQL_ENSURE(request.DataShardLocks.empty());
18491884
YQL_ENSURE(!request.NeedTxId);
1850-
1851-
auto containOnlyLiteralStages = [](const auto& request) {
1852-
for (const auto& tx : request.Transactions) {
1853-
if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
1854-
return false;
1855-
}
1856-
1857-
for (const auto& stage : tx.Body->GetStages()) {
1858-
if (stage.InputsSize() != 0) {
1859-
return false;
1860-
}
1861-
}
1862-
}
1863-
1864-
return true;
1865-
};
1866-
1867-
YQL_ENSURE(containOnlyLiteralStages(request));
1885+
YQL_ENSURE(ContainOnlyLiteralStages(request));
18681886
auto promise = NewPromise<TExecPhysicalResult>();
18691887
IActor* requestHandler = new TKqpExecLiteralRequestHandler(std::move(request), Counters, promise, params, txIndex);
18701888
RegisterActor(requestHandler);
18711889
return promise.GetFuture();
18721890
}
18731891

1892+
TExecPhysicalResult ExecuteLiteralInstant(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
1893+
YQL_ENSURE(!request.Transactions.empty());
1894+
YQL_ENSURE(request.DataShardLocks.empty());
1895+
YQL_ENSURE(!request.NeedTxId);
1896+
YQL_ENSURE(ContainOnlyLiteralStages(request));
1897+
1898+
auto ev = ::NKikimr::NKqp::ExecuteLiteral(std::move(request), Counters, TActorId{}, MakeIntrusive<TUserRequestContext>());
1899+
TExecPhysicalResult result;
1900+
FillPhysicalResult(ev, result, params, txIndex);
1901+
return result;
1902+
}
1903+
18741904
TFuture<TQueryResult> ExecScanQueryAst(const TString& cluster, const TString& query,
18751905
TQueryData::TPtr params, const TAstQuerySettings& settings, ui64 rowsLimit) override
18761906
{

ydb/core/kqp/host/kqp_gateway_proxy.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -2219,6 +2219,12 @@ class TKqpGatewayProxy : public IKikimrGateway {
22192219
return Gateway->ExecuteLiteral(program, resultType, txAlloc);
22202220
}
22212221

2222+
TExecuteLiteralResult ExecuteLiteralInstant(const TString& program,
2223+
const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override
2224+
{
2225+
return Gateway->ExecuteLiteralInstant(program, resultType, txAlloc);
2226+
}
2227+
22222228
private:
22232229
bool IsPrepare() const {
22242230
if (!SessionCtx) {

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+19-28
Original file line numberDiff line numberDiff line change
@@ -892,39 +892,30 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
892892
if (status.Level != TStatus::Ok) {
893893
return SyncStatus(status);
894894
}
895-
auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
896895

897-
return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply(
898-
[this](const NThreading::TFuture<IKikimrGateway::TExecuteLiteralResult>& future) {
899-
return TAsyncTransformCallback(
900-
[future, this](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
896+
auto literalResult = Gateway->ExecuteLiteralInstant(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
901897

902-
const auto& literalResult = future.GetValueSync();
903-
904-
if (!literalResult.Success()) {
905-
for (const auto& issue : literalResult.Issues()) {
906-
ctx.AddError(issue);
907-
}
908-
input->SetState(TExprNode::EState::Error);
909-
return IGraphTransformer::TStatus::Error;
910-
}
898+
if (!literalResult.Success()) {
899+
for (const auto& issue : literalResult.Issues()) {
900+
ctx.AddError(issue);
901+
}
902+
input->SetState(TExprNode::EState::Error);
903+
return SyncError();
904+
}
911905

912-
bool truncated = false;
913-
auto yson = this->EncodeResultToYson(literalResult.Result, truncated);
914-
if (truncated) {
915-
input->SetState(TExprNode::EState::Error);
916-
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
917-
return IGraphTransformer::TStatus::Error;
918-
}
906+
bool truncated = false;
907+
auto yson = EncodeResultToYson(literalResult.Result, truncated);
908+
if (truncated) {
909+
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
910+
input->SetState(TExprNode::EState::Error);
911+
return SyncError();
912+
}
919913

920-
output = input;
921-
input->SetState(TExprNode::EState::ExecutionComplete);
922-
input->SetResult(ctx.NewAtom(input->Pos(), yson));
923-
return IGraphTransformer::TStatus::Ok;
924-
});
925-
}));
914+
output = input;
915+
input->SetState(TExprNode::EState::ExecutionComplete);
916+
input->SetResult(ctx.NewAtom(input->Pos(), yson));
917+
return SyncOk();
926918
}
927-
928919
if (input->Content() == ConfigureName) {
929920
auto requireStatus = RequireChild(*input, 0);
930921
if (requireStatus.Level != TStatus::Ok) {

ydb/core/kqp/provider/yql_kikimr_gateway.h

+2
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,8 @@ class IKikimrGateway : public TThrRefBase {
10281028

10291029
virtual NThreading::TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
10301030

1031+
virtual TExecuteLiteralResult ExecuteLiteralInstant(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
1032+
10311033
public:
10321034
using TCreateDirFunc = std::function<void(const TString&, const TString&, NThreading::TPromise<TGenericResult>)>;
10331035

0 commit comments

Comments
 (0)