@@ -11,6 +11,9 @@ namespace NKqpRun {
11
11
12
12
class TKqpRunner ::TImpl {
13
13
using EVerbose = TYdbSetupSettings::EVerbose;
14
+ using IRetryPolicy = IRetryPolicy<Ydb::StatusIds::StatusCode>;
15
+
16
+ static constexpr TDuration RETRY_PERIOD = TDuration::MilliSeconds(100 );
14
17
15
18
public:
16
19
enum class EQueryType {
@@ -28,7 +31,28 @@ class TKqpRunner::TImpl {
28
31
, CoutColors_(NColorizer::AutoColors(Cout))
29
32
{}
30
33
31
- bool ExecuteSchemeQuery (const TRequestOptions& query) const {
34
+ bool ExecuteWithRetries (std::function<Ydb::StatusIds::StatusCode()> queryRunner) {
35
+ RetryState_ = nullptr ;
36
+ while (true ) {
37
+ const auto status = queryRunner ();
38
+ if (status == Ydb::StatusIds::SUCCESS) {
39
+ return true ;
40
+ }
41
+
42
+ if (!RetryState_) {
43
+ SetupRetryState ();
44
+ }
45
+
46
+ if (const auto delay = RetryState_->GetNextRetryDelay (status)) {
47
+ Cout << CoutColors_.Yellow () << TInstant::Now ().ToIsoStringLocal () << " Retrying query execution in " << *delay << " ..." << CoutColors_.Default () << Endl;
48
+ Sleep (*delay);
49
+ } else {
50
+ return false ;
51
+ }
52
+ }
53
+ }
54
+
55
+ Ydb::StatusIds::StatusCode ExecuteSchemeQuery (const TRequestOptions& query) const {
32
56
StartSchemeTraceOpt ();
33
57
34
58
if (VerboseLevel_ >= EVerbose::QueriesText) {
@@ -43,13 +67,13 @@ class TKqpRunner::TImpl {
43
67
44
68
if (!status.IsSuccess ()) {
45
69
Cerr << CerrColors_.Red () << " Failed to execute scheme query, reason:" << CerrColors_.Default () << Endl << status.ToString () << Endl;
46
- return false ;
70
+ return status. Status ;
47
71
}
48
72
49
- return true ;
73
+ return Ydb::StatusIds::SUCCESS ;
50
74
}
51
75
52
- bool ExecuteScript (const TRequestOptions& script) {
76
+ Ydb::StatusIds::StatusCode ExecuteScript (const TRequestOptions& script) {
53
77
StartScriptTraceOpt (script.QueryId );
54
78
55
79
if (VerboseLevel_ >= EVerbose::QueriesText) {
@@ -60,7 +84,7 @@ class TKqpRunner::TImpl {
60
84
61
85
if (!status.IsSuccess ()) {
62
86
Cerr << CerrColors_.Red () << " Failed to start script execution, reason:" << CerrColors_.Default () << Endl << status.ToString () << Endl;
63
- return false ;
87
+ return status. Status ;
64
88
}
65
89
66
90
ExecutionMeta_ = TExecutionMeta ();
@@ -69,7 +93,7 @@ class TKqpRunner::TImpl {
69
93
return WaitScriptExecutionOperation (script.QueryId );
70
94
}
71
95
72
- bool ExecuteQuery (const TRequestOptions& query, EQueryType queryType) {
96
+ Ydb::StatusIds::StatusCode ExecuteQuery (const TRequestOptions& query, EQueryType queryType) {
73
97
StartScriptTraceOpt (query.QueryId );
74
98
StartTime_ = TInstant::Now ();
75
99
@@ -93,7 +117,7 @@ class TKqpRunner::TImpl {
93
117
94
118
case EQueryType::AsyncQuery:
95
119
YdbSetup_.QueryRequestAsync (query);
96
- return true ;
120
+ return Ydb::StatusIds::SUCCESS ;
97
121
}
98
122
99
123
TYdbSetup::StopTraceOpt ();
@@ -109,14 +133,14 @@ class TKqpRunner::TImpl {
109
133
110
134
if (!status.IsSuccess ()) {
111
135
Cerr << CerrColors_.Red () << " Failed to execute query, reason:" << CerrColors_.Default () << Endl << status.ToString () << Endl;
112
- return false ;
136
+ return status. Status ;
113
137
}
114
138
115
139
if (!status.Issues .Empty ()) {
116
140
Cerr << CerrColors_.Red () << " Request finished with issues:" << CerrColors_.Default () << Endl << status.Issues .ToString () << Endl;
117
141
}
118
142
119
- return true ;
143
+ return Ydb::StatusIds::SUCCESS ;
120
144
}
121
145
122
146
void FinalizeRunner () const {
@@ -171,7 +195,7 @@ class TKqpRunner::TImpl {
171
195
}
172
196
173
197
private:
174
- bool WaitScriptExecutionOperation (ui64 queryId) {
198
+ Ydb::StatusIds::StatusCode WaitScriptExecutionOperation (ui64 queryId) {
175
199
StartTime_ = TInstant::Now ();
176
200
Y_DEFER {
177
201
TYdbSetup::StopTraceOpt ();
@@ -193,15 +217,15 @@ class TKqpRunner::TImpl {
193
217
194
218
if (!status.IsSuccess ()) {
195
219
Cerr << CerrColors_.Red () << " Failed to get script execution operation, reason:" << CerrColors_.Default () << Endl << status.ToString () << Endl;
196
- return false ;
220
+ return status. Status ;
197
221
}
198
222
199
223
if (Options_.ScriptCancelAfter && TInstant::Now () - StartTime_ > Options_.ScriptCancelAfter ) {
200
224
Cout << CoutColors_.Yellow () << TInstant::Now ().ToIsoStringLocal () << " Cancelling script execution..." << CoutColors_.Default () << Endl;
201
225
TRequestResult cancelStatus = YdbSetup_.CancelScriptExecutionOperationRequest (ExecutionMeta_.Database , ExecutionOperation_);
202
226
if (!cancelStatus.IsSuccess ()) {
203
227
Cerr << CerrColors_.Red () << " Failed to cancel script execution operation, reason:" << CerrColors_.Default () << Endl << cancelStatus.ToString () << Endl;
204
- return false ;
228
+ return cancelStatus. Status ;
205
229
}
206
230
}
207
231
@@ -215,14 +239,14 @@ class TKqpRunner::TImpl {
215
239
216
240
if (!status.IsSuccess () || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) {
217
241
Cerr << CerrColors_.Red () << " Failed to execute script, invalid final status, reason:" << CerrColors_.Default () << Endl << status.ToString () << Endl;
218
- return false ;
242
+ return status. Status ;
219
243
}
220
244
221
245
if (!status.Issues .Empty ()) {
222
246
Cerr << CerrColors_.Red () << " Request finished with issues:" << CerrColors_.Default () << Endl << status.Issues .ToString () << Endl;
223
247
}
224
248
225
- return true ;
249
+ return Ydb::StatusIds::SUCCESS ;
226
250
}
227
251
228
252
void StartSchemeTraceOpt () const {
@@ -304,9 +328,29 @@ class TKqpRunner::TImpl {
304
328
Cout << CoutColors_.Default () << Endl;
305
329
}
306
330
331
+ void SetupRetryState () {
332
+ if (!RetryPolicy_) {
333
+ const auto retryFunc = [this ](Ydb::StatusIds::StatusCode status) {
334
+ if (Options_.RetryableStatuses .contains (status)) {
335
+ return ERetryErrorClass::ShortRetry;
336
+ }
337
+ return ERetryErrorClass::NoRetry;
338
+ };
339
+ RetryPolicy_ = IRetryPolicy::GetExponentialBackoffPolicy (
340
+ retryFunc,
341
+ RETRY_PERIOD,
342
+ RETRY_PERIOD,
343
+ TDuration::Seconds (1 )
344
+ );
345
+ }
346
+ RetryState_ = RetryPolicy_->CreateRetryState ();
347
+ }
348
+
307
349
private:
308
350
TRunnerOptions Options_;
309
351
EVerbose VerboseLevel_;
352
+ IRetryPolicy::TPtr RetryPolicy_;
353
+ IRetryPolicy::IRetryState::TPtr RetryState_;
310
354
311
355
TYdbSetup YdbSetup_;
312
356
TStatsPrinter StatsPrinter_;
@@ -327,19 +371,27 @@ TKqpRunner::TKqpRunner(const TRunnerOptions& options)
327
371
{}
328
372
329
373
bool TKqpRunner::ExecuteSchemeQuery (const TRequestOptions& query) const {
330
- return Impl_->ExecuteSchemeQuery (query);
374
+ return Impl_->ExecuteWithRetries ([this , query]() {
375
+ return Impl_->ExecuteSchemeQuery (query);
376
+ });
331
377
}
332
378
333
379
bool TKqpRunner::ExecuteScript (const TRequestOptions& script) const {
334
- return Impl_->ExecuteScript (script);
380
+ return Impl_->ExecuteWithRetries ([this , script]() {
381
+ return Impl_->ExecuteScript (script);
382
+ });
335
383
}
336
384
337
385
bool TKqpRunner::ExecuteQuery (const TRequestOptions& query) const {
338
- return Impl_->ExecuteQuery (query, TImpl::EQueryType::ScriptQuery);
386
+ return Impl_->ExecuteWithRetries ([this , query]() {
387
+ return Impl_->ExecuteQuery (query, TImpl::EQueryType::ScriptQuery);
388
+ });
339
389
}
340
390
341
391
bool TKqpRunner::ExecuteYqlScript (const TRequestOptions& query) const {
342
- return Impl_->ExecuteQuery (query, TImpl::EQueryType::YqlScriptQuery);
392
+ return Impl_->ExecuteWithRetries ([this , query]() {
393
+ return Impl_->ExecuteQuery (query, TImpl::EQueryType::YqlScriptQuery);
394
+ });
343
395
}
344
396
345
397
void TKqpRunner::ExecuteQueryAsync (const TRequestOptions& query) const {
0 commit comments