Skip to content

Commit 27e31a4

Browse files
nikvas0blinkov
authored andcommitted
Flush sink on INSERT (#15003)
1 parent 6c5b41e commit 27e31a4

File tree

7 files changed

+17
-9
lines changed

7 files changed

+17
-9
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
335335
return false;
336336
}
337337

338-
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery) {
338+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit) {
339339
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
340340
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
341341
};
@@ -402,6 +402,10 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
402402
NKikimrKqp::TKqpTableSinkSettings settings;
403403
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
404404
modifiedTables.insert(getTable(settings.GetTable()));
405+
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && !commit) {
406+
// INSERT with sink should be executed immediately, because it returns an error in case of duplicate rows.
407+
return true;
408+
}
405409
} else {
406410
return true;
407411
}

ydb/core/kqp/common/kqp_tx.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ class TShardIdToTableInfo {
160160
};
161161
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;
162162

163-
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery);
163+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit);
164164

165165
class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
166166
public:
@@ -316,8 +316,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
316316
return true;
317317
}
318318

319-
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery) {
320-
NeedUncommittedChangesFlush = HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery);
319+
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) {
320+
NeedUncommittedChangesFlush = HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
321321
if (NeedUncommittedChangesFlush) {
322322
ModifiedTablesSinceLastFlush.clear();
323323
}

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,8 @@ class TKqpTransactionManager : public IKqpTransactionManager {
489489

490490
void MakeLocksIssue(const TShardInfo& shardInfo) {
491491
TStringBuilder message;
492-
message << "Transaction locks invalidated. Tables: ";
492+
message << "Transaction locks invalidated. ";
493+
message << (shardInfo.Pathes.size() == 1 ? "Table: " : "Tables: ");
493494
bool first = true;
494495
// TODO: add error by pathid
495496
for (const auto& path : shardInfo.Pathes) {

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2460,7 +2460,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
24602460
ReplyErrorAndDie(
24612461
NYql::NDqProto::StatusIds::ABORTED,
24622462
NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
2463-
TStringBuilder() << "Transaction locks invalidated. Tables: " << getPathes() << ".",
2463+
TStringBuilder()
2464+
<< "Transaction locks invalidated. "
2465+
<< (TxManager->GetShardTableInfo(ev->Get()->Record.GetOrigin()).Pathes.size() == 1 ? "Table: " : "Tables: ")
2466+
<< getPathes() << ".",
24642467
getIssues());
24652468
return;
24662469
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -944,7 +944,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
944944
}
945945

946946
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
947-
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery);
947+
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery, QueryState->Commit);
948948
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
949949
EKikimrQueryType::Dml);
950950
if (!success) {

ydb/core/kqp/ut/tx/kqp_locks_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,9 @@ Y_UNIT_TEST_SUITE(KqpLocks) {
241241
}), commitResult.GetIssues().ToString());
242242
}
243243

244-
Y_UNIT_TEST(MixedTxFail) {
244+
Y_UNIT_TEST_TWIN(MixedTxFail, useSink) {
245245
NKikimrConfig::TAppConfig appConfig;
246+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(useSink);
246247
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
247248
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true);
248249
auto settings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(false);

ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) {
177177

178178
auto result = session.ExecuteQuery(Q_(R"(
179179
INSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "New");
180-
SELECT COUNT(*) FROM `/Root/KV`;
181180
)"), TTxControl::Tx(tx.GetId())).ExtractValueSync();
182181
result.GetIssues().PrintTo(Cerr);
183182
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());

0 commit comments

Comments
 (0)