@@ -140,28 +140,29 @@ struct MainTestCase {
140
140
{
141
141
}
142
142
143
- void CreateTable (const TString& tableDDL) {
144
- auto ddl = Sprintf (tableDDL.data (), TableName.data ());
143
+ void ExecuteDDL (const TString& ddl) {
145
144
auto res = Session.ExecuteQuery (ddl, TTxControl::NoTx ()).GetValueSync ();
146
145
UNIT_ASSERT_C (res.IsSuccess (), res.GetIssues ().ToString ());
147
146
}
148
147
148
+ void CreateTable (const TString& tableDDL) {
149
+ ExecuteDDL (Sprintf (tableDDL.data (), TableName.data ()));
150
+ }
151
+
149
152
void CreateTopic (size_t partitionCount = 10 ) {
150
- auto res = Session. ExecuteQuery (Sprintf (R"(
153
+ ExecuteDDL (Sprintf (R"(
151
154
CREATE TOPIC `%s`
152
155
WITH (
153
156
min_active_partitions = %d
154
157
);
155
- )" , TopicName.data (), partitionCount), TTxControl::NoTx ()).GetValueSync ();
156
- UNIT_ASSERT_C (res.IsSuccess (), res.GetIssues ().ToString ());
158
+ )" , TopicName.data (), partitionCount));
157
159
}
158
160
159
161
void CreateConsumer (const TString& consumerName) {
160
- auto res = Session. ExecuteQuery (Sprintf (R"(
162
+ ExecuteDDL (Sprintf (R"(
161
163
ALTER TOPIC `%s`
162
164
ADD CONSUMER `%s`;
163
- )" , TopicName.data (), consumerName.data ()), TTxControl::NoTx ()).GetValueSync ();
164
- UNIT_ASSERT_C (res.IsSuccess (), res.GetIssues ().ToString ());
165
+ )" , TopicName.data (), consumerName.data ()));
165
166
}
166
167
167
168
struct CreateTransferSettings {
@@ -200,7 +201,7 @@ struct MainTestCase {
200
201
sb << " , BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl;
201
202
}
202
203
203
- auto res = Session. ExecuteQuery ( Sprintf (R"(
204
+ auto ddl = Sprintf (R"(
204
205
%s;
205
206
206
207
CREATE TRANSFER `%s`
@@ -209,9 +210,9 @@ struct MainTestCase {
209
210
CONNECTION_STRING = 'grpc://%s'
210
211
%s
211
212
);
212
- )" , lambda.data (), TransferName.data (), TopicName.data (), TableName.data (), ConnectionString.data (), sb.data ()),
213
- TTxControl::NoTx ()). GetValueSync ();
214
- UNIT_ASSERT_C (res. IsSuccess (), res. GetIssues (). ToString () );
213
+ )" , lambda.data (), TransferName.data (), TopicName.data (), TableName.data (), ConnectionString.data (), sb.data ());
214
+
215
+ ExecuteDDL (ddl );
215
216
}
216
217
217
218
struct AlterTransferSettings {
@@ -360,7 +361,8 @@ struct MainTestCase {
360
361
for (size_t i = 20 ; i--;) {
361
362
auto result = DescribeTransfer ().GetReplicationDescription ();
362
363
if (TReplicationDescription::EState::Error == result.GetState ()) {
363
- Cerr << " >>>>> " << result.GetErrorState ().GetIssues ().ToOneLineString () << Endl << Flush;
364
+ Cerr << " >>>>> ACTUAL: " << result.GetErrorState ().GetIssues ().ToOneLineString () << Endl << Flush;
365
+ Cerr << " >>>>> EXPECTED: " << expectedMessage << Endl << Flush;
364
366
UNIT_ASSERT (result.GetErrorState ().GetIssues ().ToOneLineString ().contains (expectedMessage));
365
367
break ;
366
368
}
@@ -1242,7 +1244,34 @@ Y_UNIT_TEST_SUITE(Transfer)
1242
1244
}});
1243
1245
}
1244
1246
1245
- Y_UNIT_TEST (CreateTransferTopicNotExists)
1247
+ Y_UNIT_TEST (CreateTransferSourceNotExists)
1248
+ {
1249
+ MainTestCase testCase;
1250
+ testCase.CreateTable (R"(
1251
+ CREATE TABLE `%s` (
1252
+ Key Uint64 NOT NULL,
1253
+ Message Utf8 NOT NULL,
1254
+ PRIMARY KEY (Key)
1255
+ ) WITH (
1256
+ STORE = COLUMN
1257
+ );
1258
+ )" );
1259
+
1260
+ testCase.CreateTransfer (R"(
1261
+ $l = ($x) -> {
1262
+ return [
1263
+ <|
1264
+ Key:CAST($x._offset AS Uint64),
1265
+ Message:CAST($x._data AS Utf8)
1266
+ |>
1267
+ ];
1268
+ };
1269
+ )" );
1270
+
1271
+ testCase.CheckTransferStateError (" Discovery error: local/Topic_" );
1272
+ }
1273
+
1274
+ Y_UNIT_TEST (CreateTransferSourceIsNotTopic)
1246
1275
{
1247
1276
MainTestCase testCase;
1248
1277
testCase.CreateTable (R"(
@@ -1255,6 +1284,13 @@ Y_UNIT_TEST_SUITE(Transfer)
1255
1284
);
1256
1285
)" );
1257
1286
1287
+ testCase.ExecuteDDL (Sprintf (R"(
1288
+ CREATE TABLE `%s` (
1289
+ Key Uint64 NOT NULL,
1290
+ PRIMARY KEY (Key)
1291
+ );
1292
+ )" , testCase.TopicName .data ()));
1293
+
1258
1294
testCase.CreateTransfer (R"(
1259
1295
$l = ($x) -> {
1260
1296
return [
@@ -1292,7 +1328,48 @@ Y_UNIT_TEST_SUITE(Transfer)
1292
1328
};
1293
1329
)" );
1294
1330
1295
- testCase.CheckTransferStateError (" Unexpected entry kind at 'writer'" );
1331
+ testCase.CheckTransferStateError (" Only column tables are supported as transfer targets" );
1332
+ }
1333
+
1334
+ Y_UNIT_TEST (CreateTransferTargetIsNotTable)
1335
+ {
1336
+ MainTestCase testCase;
1337
+ testCase.CreateTable (R"(
1338
+ CREATE TOPIC `%s`;
1339
+ )" );
1340
+ testCase.CreateTopic ();
1341
+
1342
+ testCase.CreateTransfer (R"(
1343
+ $l = ($x) -> {
1344
+ return [
1345
+ <|
1346
+ Key:CAST($x._offset AS Uint64),
1347
+ Message:CAST($x._data AS Utf8)
1348
+ |>
1349
+ ];
1350
+ };
1351
+ )" );
1352
+
1353
+ testCase.CheckTransferStateError (" Only column tables are supported as transfer targets" );
1354
+ }
1355
+
1356
+ Y_UNIT_TEST (CreateTransferTargetNotExists)
1357
+ {
1358
+ MainTestCase testCase;
1359
+ testCase.CreateTopic ();
1360
+
1361
+ testCase.CreateTransfer (R"(
1362
+ $l = ($x) -> {
1363
+ return [
1364
+ <|
1365
+ Key:CAST($x._offset AS Uint64),
1366
+ Message:CAST($x._data AS Utf8)
1367
+ |>
1368
+ ];
1369
+ };
1370
+ )" );
1371
+
1372
+ testCase.CheckTransferStateError (TStringBuilder () << " The target table `/local/" << testCase.TableName << " ` does not exist" );
1296
1373
}
1297
1374
}
1298
1375
0 commit comments