diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java index 56fc9c9f2e32c..aa2d2e6eb6cec 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java @@ -53,6 +53,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -177,8 +178,6 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize, bkc.failAfter(1, BKException.Code.NotEnoughBookiesException); metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException("")); } - OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder() - .numThreads(5).name("txn-threads").build(); HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), 1, TimeUnit.MILLISECONDS); JsonDataSerializer dataSerializer = new JsonDataSerializer(eachDataBytesLen); @@ -189,7 +188,7 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize, */ // Create TxLogBufferedWriter. TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter( - managedLedger, orderedExecutor, transactionTimer, + managedLedger, ((ManagedLedgerImpl) managedLedger).getExecutor(), transactionTimer, dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize, batchedWriteMaxDelayInMillis, batchEnabled, DISABLED_BUFFERED_WRITER_METRICS); // Store the param-context, param-position, param-exception of callback function and complete-count for verify. @@ -344,7 +343,6 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { managedLedger.close(); } transactionTimer.stop(); - orderedExecutor.shutdown(); /** * Assert all Byte Buf generated by DataSerializer has been released. * 1. Because ManagedLedger holds write cache, some data is not actually released until ManagedLedger is