@@ -3682,8 +3682,41 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l
36823682 }
36833683
36843684 @ Test
3685- public void testRLMOpsWhenMetadataIsNotReady () throws InterruptedException {
3686- CountDownLatch latch = new CountDownLatch (2 );
3685+ public void testRLMOpsWhenMetadataIsNotReady () throws InterruptedException , IOException {
3686+ // Recreate a remoteLogManager with default REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP (default value is 30000).
3687+ // The value in setup function is 100 which is too small. If the case can't run two verifyNoMoreInteractions in
3688+ // 100ms, the test will fail.
3689+ remoteLogManager .close ();
3690+ clearInvocations (remoteLogMetadataManager , remoteStorageManager );
3691+ Properties props = brokerConfig ;
3692+ props .setProperty (RemoteLogManagerConfig .REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP , "true" );
3693+ props .setProperty (RemoteLogManagerConfig .REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP , "30000" );
3694+ appendRLMConfig (props );
3695+ config = KafkaConfig .fromProps (props );
3696+
3697+ remoteLogManager = new RemoteLogManager (config .remoteLogManagerConfig (), brokerId , logDir , clusterId , time ,
3698+ tp -> Optional .of (mockLog ),
3699+ (topicPartition , offset ) -> currentLogStartOffset .set (offset ),
3700+ brokerTopicStats , metrics ) {
3701+ public RemoteStorageManager createRemoteStorageManager () {
3702+ return remoteStorageManager ;
3703+ }
3704+ public RemoteLogMetadataManager createRemoteLogMetadataManager () {
3705+ return remoteLogMetadataManager ;
3706+ }
3707+ public RLMQuotaManager createRLMCopyQuotaManager () {
3708+ return rlmCopyQuotaManager ;
3709+ }
3710+ public Duration quotaTimeout () {
3711+ return Duration .ofMillis (100 );
3712+ }
3713+ @ Override
3714+ long findLogStartOffset (TopicIdPartition topicIdPartition , UnifiedLog log ) {
3715+ return 0L ;
3716+ }
3717+ };
3718+
3719+ CountDownLatch latch = new CountDownLatch (3 ); // there are 3 RLMTasks, so setting the count to 3
36873720 when (remoteLogMetadataManager .isReady (any (TopicIdPartition .class )))
36883721 .thenAnswer (ans -> {
36893722 latch .countDown ();
0 commit comments