@@ -33,7 +33,7 @@ internal class ManagedResourceController<TEntity> : IManagedResourceController
33
33
private readonly OperatorSettings _settings ;
34
34
private readonly IFinalizerManager < TEntity > _finalizerManager ;
35
35
36
- private readonly Subject < ( TEntity Resource , TimeSpan Delay ) >
36
+ private readonly Subject < RequeuedEvent >
37
37
_requeuedEvents = new ( ) ;
38
38
39
39
private readonly Subject < QueuedEvent >
@@ -85,9 +85,17 @@ public ManagedResourceController(
85
85
private IObservable < Unit > RequeuedEvents => _requeuedEvents
86
86
. Do ( _ => _metrics . RequeuedEvents . Inc ( ) )
87
87
. Select (
88
- data => Observable . Return ( data . Resource ) . Delay ( data . Delay ) )
88
+ data => Observable . Return ( data ) . Delay ( data . Delay ) )
89
89
. Switch ( )
90
- . Select ( data => Observable . FromAsync ( ( ) => UpdateResourceData ( data ) ) )
90
+ . Select ( data =>
91
+ Observable . FromAsync ( async ( ) =>
92
+ {
93
+ var queuedEvent = await UpdateResourceData ( data . Resource ) ;
94
+
95
+ return data . ResourceEvent . HasValue && queuedEvent != null
96
+ ? queuedEvent with { ResourceEvent = data . ResourceEvent . Value }
97
+ : queuedEvent ;
98
+ } ) )
91
99
. Switch ( )
92
100
. Where ( data => data != null )
93
101
. Do (
@@ -269,13 +277,32 @@ protected async Task HandleResourceEvent(QueuedEvent? data)
269
277
resource . Name ( ) ) ;
270
278
return ;
271
279
case RequeueEventResult requeue :
272
- _logger . LogInformation (
273
- @"Event type ""{eventType}"" on resource ""{kind}/{name}"" successfully reconciled. Requeue requested with delay ""{requeue}""." ,
274
- @event ,
275
- resource . Kind ,
276
- resource . Name ( ) ,
277
- requeue . RequeueIn ) ;
278
- _requeuedEvents . OnNext ( ( resource , requeue . RequeueIn ) ) ;
280
+ if ( _settings . DefaultRequeueAsSameType )
281
+ {
282
+ requeue = new RequeueEventResult ( requeue . RequeueIn , @event ) ;
283
+ }
284
+
285
+ if ( requeue . EventType . HasValue )
286
+ {
287
+ _logger . LogInformation (
288
+ @"Event type ""{eventType}"" on resource ""{kind}/{name}"" successfully reconciled. Requeue requested as type ""{requeueType}"" with delay ""{requeue}""." ,
289
+ @event ,
290
+ resource . Kind ,
291
+ resource . Name ( ) ,
292
+ requeue . EventType ,
293
+ requeue . RequeueIn ) ;
294
+ }
295
+ else
296
+ {
297
+ _logger . LogInformation (
298
+ @"Event type ""{eventType}"" on resource ""{kind}/{name}"" successfully reconciled. Requeue requested with delay ""{requeue}""." ,
299
+ @event ,
300
+ resource . Kind ,
301
+ resource . Name ( ) ,
302
+ requeue . RequeueIn ) ;
303
+ }
304
+
305
+ _requeuedEvents . OnNext ( new RequeuedEvent ( requeue . EventType , resource , requeue . RequeueIn ) ) ;
279
306
break ;
280
307
}
281
308
}
@@ -417,5 +444,7 @@ private TimeSpan ExponentialBackoff(int retryCount) => TimeSpan
417
444
. Add ( TimeSpan . FromMilliseconds ( _rnd . Next ( 0 , 1000 ) ) ) ;
418
445
419
446
internal record QueuedEvent ( ResourceEventType ResourceEvent , TEntity Resource , int RetryCount = 0 ) ;
447
+
448
+ private record RequeuedEvent ( ResourceEventType ? ResourceEvent , TEntity Resource , TimeSpan Delay ) ;
420
449
}
421
450
}
0 commit comments