Skip to content

Commit 3e527f6

Browse files
authored
Prepare tests for Sinks (#16393)
1 parent f44c111 commit 3e527f6

File tree

8 files changed

+186
-152
lines changed

8 files changed

+186
-152
lines changed

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -839,15 +839,17 @@ class TxPlanSerializer {
839839
op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : tablePath;
840840
op.Properties["Path"] = tablePath;
841841

842-
const auto& tupleType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
843-
YQL_ENSURE(tupleType);
844-
YQL_ENSURE(tupleType->GetSize() == 1);
845-
const auto& listType = tupleType->GetItems()[0]->Cast<TListExprType>();
846-
YQL_ENSURE(listType);
847-
const auto& structType = listType->GetItemType()->Cast<TStructExprType>();
848-
YQL_ENSURE(structType);
849-
for (const auto& item : structType->GetItems()) {
850-
writeInfo.Columns.push_back(TString(item->GetName()));
842+
if (writeInfo.Type != EPlanTableWriteType::MultiErase) {
843+
const auto& tupleType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
844+
YQL_ENSURE(tupleType);
845+
YQL_ENSURE(tupleType->GetSize() == 1);
846+
const auto& listType = tupleType->GetItems()[0]->Cast<TListExprType>();
847+
YQL_ENSURE(listType);
848+
const auto& structType = listType->GetItemType()->Cast<TStructExprType>();
849+
YQL_ENSURE(structType);
850+
for (const auto& item : structType->GetItems()) {
851+
writeInfo.Columns.push_back(TString(item->GetName()));
852+
}
851853
}
852854

853855
SerializerCtx.Tables[tablePath].Writes.push_back(writeInfo);

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10056,9 +10056,10 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
1005610056
);
1005710057
}
1005810058

10059-
Y_UNIT_TEST(DoubleCreateResourcePoolClassifier) {
10059+
Y_UNIT_TEST_TWIN(DoubleCreateResourcePoolClassifier, UseSink) {
1006010060
NKikimrConfig::TAppConfig config;
1006110061
config.MutableFeatureFlags()->SetEnableResourcePools(true);
10062+
config.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
1006210063

1006310064
TKikimrRunner kikimr(NKqp::TKikimrSettings()
1006410065
.SetAppConfig(config)
@@ -10085,7 +10086,11 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
1008510086
);)";
1008610087
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
1008710088
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
10088-
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Conflict with existing key", result.GetIssues().ToString());
10089+
if (UseSink) {
10090+
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Duplicate keys have been found", result.GetIssues().ToString());
10091+
} else {
10092+
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Conflict with existing key", result.GetIssues().ToString());
10093+
}
1008910094
}
1009010095
}
1009110096

ydb/core/tx/datashard/check_write_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
115115
if (!Pipeline.AssignPlanInterval(op)) {
116116
TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID();
117117

118-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
118+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, err);
119119
op->Abort(EExecutionUnitKind::FinishProposeWrite);
120120

121121
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);

ydb/services/persqueue_v1/ut/topic_service_ut.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,11 @@ Y_UNIT_TEST_F(MultiplePartitionsAndNoGapsInTheOffsets, TUpdateOffsetsInTransacti
412412
auto result = tx->Commit().ExtractValueSync();
413413
Cerr << ">>> CommitTx >>>" << Endl;
414414
UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
415-
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
415+
if (server->ServerSettings.AppConfig->GetTableServiceConfig().GetEnableOltpSink()) {
416+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
417+
} else {
418+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::ABORTED);
419+
}
416420
}
417421

418422
}

ydb/services/ydb/ydb_table_ut.cpp

Lines changed: 148 additions & 135 deletions
Large diffs are not rendered by default.

ydb/tests/functional/api/test_insert.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import logging
33

4-
from hamcrest import assert_that, raises, equal_to
4+
from hamcrest import assert_that, raises, equal_to, any_of
55

66
from ydb.tests.library.harness.kikimr_runner import KiKiMR
77
from ydb.tests.oss.ydb_sdk_import import ydb
@@ -189,9 +189,15 @@ def callee():
189189
if first_query_kind in row_adding_operations and second_query_kind == 'insert':
190190
assert_that(
191191
callee,
192-
raises(
193-
ydb.PreconditionFailed,
194-
"Conflict with existing key."
192+
any_of(
193+
raises(
194+
ydb.PreconditionFailed,
195+
"Conflict with existing key."
196+
),
197+
raises(
198+
ydb.PreconditionFailed,
199+
"Duplicate keys have been found."
200+
)
195201
)
196202
)
197203

ydb/tests/functional/api/test_isolation.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,9 @@ def test_anti_dependency_cycles_g2_two_edges(self):
651651
def callee():
652652
t1.execute('{} update {} set value = 0 where id = 1;'.format(prefix, table_name))
653653

654+
# Sinks allow UPDATE statement to be delayed, so we do commit here to execute update.
655+
t1.commit()
656+
654657
assert_that(
655658
callee,
656659
raises(

ydb/tests/functional/api/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ FORK_TEST_FILES()
44
SIZE(MEDIUM)
55

66
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
7+
ENV(YDB_HARD_MEMORY_LIMIT_BYTES="8000000000")
78

89
TEST_SRCS(
910
test_session_pool.py

0 commit comments

Comments
 (0)