@@ -217,7 +217,6 @@ private void createContinuousTransform(String indexName, String transformId, Str
217
217
* Verify the basic stats API, which includes state, health, and optionally progress (if it exists).
218
218
* These are required for Kibana 8.13+.
219
219
*/
220
- @ SuppressWarnings ("unchecked" )
221
220
public void testBasicContinuousTransformStats () throws Exception {
222
221
var transformId = "transform-continuous-basic-stats" ;
223
222
createContinuousTransform ("continuous-basic-stats-reviews" , transformId , "reviews-by-user-business-day" );
@@ -234,6 +233,54 @@ public void testBasicContinuousTransformStats() throws Exception {
234
233
deleteTransform (transformId );
235
234
}
236
235
236
+ public void testEmptySourceIndexClearsErrors () throws Exception {
237
+ var sourceIndexName = "source-empty-reviews" ;
238
+ var destIndexName = "destination-empty-reviews" ;
239
+ var transformId = "transform-empty-source-index" ;
240
+
241
+ createReviewsIndexMappings (sourceIndexName , null );
242
+
243
+ var config = createTransformConfigBuilder (transformId , destIndexName , QueryConfig .matchAll (), sourceIndexName ).setPivotConfig (
244
+ createPivotConfig (groupByUserOnly (), aggregateScoresAndTimes ())
245
+ )
246
+ .setSyncConfig (new TimeSyncConfig ("timestamp" , TimeValue .timeValueSeconds (1 )))
247
+ .setSettings (new SettingsConfig .Builder ().setUnattended (true ).build ())
248
+ .build ();
249
+
250
+ putTransform (transformId , Strings .toString (config ), RequestOptions .DEFAULT );
251
+ startTransform (config .getId (), RequestOptions .DEFAULT );
252
+
253
+ waitUntilCheckpoint (config .getId (), 1L );
254
+ assertEquals ("green" , getTransformHealthStatus (transformId ));
255
+
256
+ // this will cause the transform to fail to search
257
+ assertAcknowledged (adminClient ().performRequest (new Request ("PUT" , sourceIndexName + "/_block/read" )));
258
+ assertBusy (() -> assertThat (getTransformHealthStatus (transformId ), oneOf ("yellow" , "red" )), 30 , TimeUnit .SECONDS );
259
+
260
+ // unblock reads on the search index and the transform should recover
261
+ var request = new Request ("PUT" , sourceIndexName + "/_settings" );
262
+ request .setJsonEntity ("""
263
+ { "blocks.read": false }
264
+ """ );
265
+ assertAcknowledged (adminClient ().performRequest (request ));
266
+ assertBusy (() -> assertEquals ("green" , getTransformHealthStatus (transformId )), 30 , TimeUnit .SECONDS );
267
+
268
+ stopTransform (transformId );
269
+ deleteTransform (transformId );
270
+ deleteIndex (sourceIndexName );
271
+ deleteIndex (destIndexName );
272
+ }
273
+
274
+ private Map <String , SingleGroupSource > groupByUserOnly () {
275
+ return Map .of ("by-user" , new TermsGroupSource ("user_id" , null , false ));
276
+ }
277
+
278
+ private AggregatorFactories .Builder aggregateScoresAndTimes () {
279
+ return AggregatorFactories .builder ()
280
+ .addAggregator (AggregationBuilders .avg ("review_score" ).field ("stars" ))
281
+ .addAggregator (AggregationBuilders .max ("timestamp" ).field ("timestamp" ));
282
+ }
283
+
237
284
public void testDestinationIndexBlocked () throws Exception {
238
285
var transformId = "transform-continuous-blocked-destination" ;
239
286
var sourceIndexName = "source-reviews" ;
@@ -385,12 +432,8 @@ public void testContinuousTransformUpdate() throws Exception {
385
432
String indexName = "continuous-reviews-update" ;
386
433
createReviewsIndex (indexName , 10 , NUM_USERS , TransformIT ::getUserIdForRow , TransformIT ::getDateStringForRow );
387
434
388
- Map <String , SingleGroupSource > groups = new HashMap <>();
389
- groups .put ("by-user" , new TermsGroupSource ("user_id" , null , false ));
390
-
391
- AggregatorFactories .Builder aggs = AggregatorFactories .builder ()
392
- .addAggregator (AggregationBuilders .avg ("review_score" ).field ("stars" ))
393
- .addAggregator (AggregationBuilders .max ("timestamp" ).field ("timestamp" ));
435
+ var groups = groupByUserOnly ();
436
+ var aggs = aggregateScoresAndTimes ();
394
437
395
438
String id = "transform-to-update" ;
396
439
String dest = "reviews-by-user-business-day-to-update" ;
@@ -481,8 +524,7 @@ public void testRetentionPolicyDelete() throws Exception {
481
524
String dest = "retention-policy-dest" ;
482
525
createReviewsIndex (indexName , 10 , NUM_USERS , TransformIT ::getUserIdForRow , TransformIT ::getDateStringForRow );
483
526
484
- Map <String , SingleGroupSource > groups = new HashMap <>();
485
- groups .put ("by-user" , new TermsGroupSource ("user_id" , null , false ));
527
+ var groups = groupByUserOnly ();
486
528
487
529
AggregatorFactories .Builder aggs = AggregatorFactories .builder ()
488
530
.addAggregator (AggregationBuilders .max ("timestamp" ).field ("timestamp" ));
0 commit comments