Skip to content

Commit f8d2cc4

Browse files
lhotariTechnoboy-
authored andcommitted
[fix][offload] Fix OOM in tiered storage, caused by unbounded offsets cache (apache#22679)
Co-authored-by: Jiwe Guo <[email protected]> (cherry picked from commit 566330c)
1 parent 9006263 commit f8d2cc4

File tree

10 files changed

+185
-31
lines changed

10 files changed

+185
-31
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
*/
3232
@LimitedPrivate
3333
@Evolving
34-
public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
34+
public interface LedgerOffloaderFactory<T extends LedgerOffloader> extends AutoCloseable {
3535

3636
/**
3737
* Check whether the provided driver <tt>driverName</tt> is supported.
@@ -111,4 +111,9 @@ default T create(OffloadPoliciesImpl offloadPolicies,
111111
throws IOException {
112112
return create(offloadPolicies, userMetadata, scheduler, offloaderStats);
113113
}
114+
115+
@Override
116+
default void close() throws Exception {
117+
// no-op
118+
}
114119
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public LedgerOffloaderFactory getOffloaderFactory(String driverName) throws IOEx
4646
@Override
4747
public void close() throws Exception {
4848
offloaders.forEach(offloader -> {
49+
try {
50+
offloader.getRight().close();
51+
} catch (Exception e) {
52+
log.warn("Failed to close offloader '{}': {}",
53+
offloader.getRight().getClass(), e.getMessage());
54+
}
4955
try {
5056
offloader.getLeft().close();
5157
} catch (IOException e) {

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
2626
import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
2727
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
28+
import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
2829
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
2930
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
3031
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -33,12 +34,7 @@
3334
* A jcloud based offloader factory.
3435
*/
3536
public class JCloudLedgerOffloaderFactory implements LedgerOffloaderFactory<BlobStoreManagedLedgerOffloader> {
36-
37-
public static JCloudLedgerOffloaderFactory of() {
38-
return INSTANCE;
39-
}
40-
41-
private static final JCloudLedgerOffloaderFactory INSTANCE = new JCloudLedgerOffloaderFactory();
37+
private final OffsetsCache entryOffsetsCache = new OffsetsCache();
4238

4339
@Override
4440
public boolean isDriverSupported(String driverName) {
@@ -58,6 +54,12 @@ public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicie
5854

5955
TieredStorageConfiguration config =
6056
TieredStorageConfiguration.create(offloadPolicies.toProperties());
61-
return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats);
57+
return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats,
58+
entryOffsetsCache);
59+
}
60+
61+
@Override
62+
public void close() throws Exception {
63+
entryOffsetsCache.close();
6264
}
6365
}

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
2020

2121
import com.google.common.annotations.VisibleForTesting;
22-
import com.google.common.cache.Cache;
23-
import com.google.common.cache.CacheBuilder;
2422
import io.netty.buffer.ByteBuf;
2523
import java.io.DataInputStream;
2624
import java.io.IOException;
@@ -56,19 +54,13 @@
5654

5755
public class BlobStoreBackedReadHandleImpl implements ReadHandle {
5856
private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
59-
private static final int CACHE_TTL_SECONDS =
60-
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 30 * 60);
6157

6258
private final long ledgerId;
6359
private final OffloadIndexBlock index;
6460
private final BackedInputStream inputStream;
6561
private final DataInputStream dataStream;
6662
private final ExecutorService executor;
67-
// this Cache is accessed only by one thread
68-
private final Cache<Long, Long> entryOffsets = CacheBuilder
69-
.newBuilder()
70-
.expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
71-
.build();
63+
private final OffsetsCache entryOffsetsCache;
7264
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
7365

7466
enum State {
@@ -79,12 +71,14 @@ enum State {
7971
private volatile State state = null;
8072

8173
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
82-
BackedInputStream inputStream, ExecutorService executor) {
74+
BackedInputStream inputStream, ExecutorService executor,
75+
OffsetsCache entryOffsetsCache) {
8376
this.ledgerId = ledgerId;
8477
this.index = index;
8578
this.inputStream = inputStream;
8679
this.dataStream = new DataInputStream(inputStream);
8780
this.executor = executor;
81+
this.entryOffsetsCache = entryOffsetsCache;
8882
state = State.Opened;
8983
}
9084

@@ -109,7 +103,6 @@ public CompletableFuture<Void> closeAsync() {
109103
try {
110104
index.close();
111105
inputStream.close();
112-
entryOffsets.invalidateAll();
113106
state = State.Closed;
114107
promise.complete(null);
115108
} catch (IOException t) {
@@ -164,7 +157,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
164157
long entryId = dataStream.readLong();
165158

166159
if (entryId == nextExpectedId) {
167-
entryOffsets.put(entryId, currentPosition);
160+
entryOffsetsCache.put(ledgerId, entryId, currentPosition);
168161
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
169162
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
170163
int toWrite = length;
@@ -215,7 +208,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
215208
}
216209

217210
private void seekToEntry(long nextExpectedId) throws IOException {
218-
Long knownOffset = entryOffsets.getIfPresent(nextExpectedId);
211+
Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId);
219212
if (knownOffset != null) {
220213
inputStream.seek(knownOffset);
221214
} else {
@@ -269,7 +262,8 @@ public static ReadHandle open(ScheduledExecutorService executor,
269262
BlobStore blobStore, String bucket, String key, String indexKey,
270263
VersionCheck versionCheck,
271264
long ledgerId, int readBufferSize,
272-
LedgerOffloaderStats offloaderStats, String managedLedgerName)
265+
LedgerOffloaderStats offloaderStats, String managedLedgerName,
266+
OffsetsCache entryOffsetsCache)
273267
throws IOException, BKException.BKNoSuchLedgerExistsException {
274268
int retryCount = 3;
275269
OffloadIndexBlock index = null;
@@ -310,7 +304,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
310304
BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
311305
versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);
312306

313-
return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
307+
return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor, entryOffsetsCache);
314308
}
315309

316310
// for testing

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
108108
private AtomicLong bufferLength = new AtomicLong(0);
109109
private AtomicLong segmentLength = new AtomicLong(0);
110110
private final long maxBufferLength;
111+
private final OffsetsCache entryOffsetsCache;
111112
private final ConcurrentLinkedQueue<Entry> offloadBuffer = new ConcurrentLinkedQueue<>();
112113
private CompletableFuture<OffloadResult> offloadResult;
113114
private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST;
@@ -123,13 +124,16 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
123124
public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config,
124125
Map<String, String> userMetadata,
125126
OrderedScheduler scheduler,
126-
LedgerOffloaderStats offloaderStats) throws IOException {
127+
LedgerOffloaderStats offloaderStats,
128+
OffsetsCache entryOffsetsCache)
129+
throws IOException {
127130

128-
return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats);
131+
return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats, entryOffsetsCache);
129132
}
130133

131134
BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler,
132-
Map<String, String> userMetadata, LedgerOffloaderStats offloaderStats) {
135+
Map<String, String> userMetadata, LedgerOffloaderStats offloaderStats,
136+
OffsetsCache entryOffsetsCache) {
133137

134138
this.scheduler = scheduler;
135139
this.userMetadata = userMetadata;
@@ -140,6 +144,7 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration
140144
this.minSegmentCloseTimeMillis = Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
141145
//ensure buffer can have enough content to fill a block
142146
this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), config.getMinBlockSizeInBytes());
147+
this.entryOffsetsCache = entryOffsetsCache;
143148
this.segmentBeginTimeMillis = System.currentTimeMillis();
144149
if (!Strings.isNullOrEmpty(config.getRegion())) {
145150
this.writeLocation = new LocationBuilder()
@@ -555,7 +560,8 @@ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
555560
readBucket, key, indexKey,
556561
DataBlockUtils.VERSION_CHECK,
557562
ledgerId, config.getReadBufferSizeInBytes(),
558-
this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME)));
563+
this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME),
564+
this.entryOffsetsCache));
559565
} catch (Throwable t) {
560566
log.error("Failed readOffloaded: ", t);
561567
promise.completeExceptionally(t);
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
20+
21+
import com.google.common.cache.Cache;
22+
import com.google.common.cache.CacheBuilder;
23+
import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.TimeUnit;
27+
28+
public class OffsetsCache implements AutoCloseable {
29+
private static final int CACHE_TTL_SECONDS =
30+
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 5 * 60);
31+
// limit the cache size to avoid OOM
32+
// 1 million entries consumes about 60MB of heap space
33+
private static final int CACHE_MAX_SIZE =
34+
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.max.size", 1_000_000);
35+
private final ScheduledExecutorService cacheEvictionExecutor;
36+
37+
record Key(long ledgerId, long entryId) {
38+
39+
}
40+
41+
private final Cache<OffsetsCache.Key, Long> entryOffsetsCache;
42+
43+
public OffsetsCache() {
44+
if (CACHE_MAX_SIZE > 0) {
45+
entryOffsetsCache = CacheBuilder
46+
.newBuilder()
47+
.expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
48+
.maximumSize(CACHE_MAX_SIZE)
49+
.build();
50+
cacheEvictionExecutor =
51+
Executors.newSingleThreadScheduledExecutor(
52+
new DefaultThreadFactory("jcloud-offsets-cache-eviction"));
53+
int period = Math.max(CACHE_TTL_SECONDS / 2, 1);
54+
cacheEvictionExecutor.scheduleAtFixedRate(() -> {
55+
entryOffsetsCache.cleanUp();
56+
}, period, period, TimeUnit.SECONDS);
57+
} else {
58+
cacheEvictionExecutor = null;
59+
entryOffsetsCache = null;
60+
}
61+
}
62+
63+
public void put(long ledgerId, long entryId, long currentPosition) {
64+
if (entryOffsetsCache != null) {
65+
entryOffsetsCache.put(new Key(ledgerId, entryId), currentPosition);
66+
}
67+
}
68+
69+
public Long getIfPresent(long ledgerId, long entryId) {
70+
return entryOffsetsCache != null ? entryOffsetsCache.getIfPresent(new Key(ledgerId, entryId)) : null;
71+
}
72+
73+
public void clear() {
74+
if (entryOffsetsCache != null) {
75+
entryOffsetsCache.invalidateAll();
76+
}
77+
}
78+
79+
@Override
80+
public void close() {
81+
if (cacheEvictionExecutor != null) {
82+
cacheEvictionExecutor.shutdownNow();
83+
}
84+
}
85+
}

tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.jclouds.blobstore.BlobStore;
3434
import org.jclouds.domain.Credentials;
3535
import org.testng.Assert;
36+
import org.testng.annotations.AfterClass;
3637
import org.testng.annotations.AfterMethod;
3738

3839
public abstract class BlobStoreManagedLedgerOffloaderBase {
@@ -46,6 +47,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
4647
protected final JCloudBlobStoreProvider provider;
4748
protected TieredStorageConfiguration config;
4849
protected BlobStore blobStore = null;
50+
protected final OffsetsCache entryOffsetsCache = new OffsetsCache();
4951

5052
protected BlobStoreManagedLedgerOffloaderBase() throws Exception {
5153
scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(5).name("offloader").build();
@@ -56,6 +58,13 @@ protected BlobStoreManagedLedgerOffloaderBase() throws Exception {
5658
@AfterMethod(alwaysRun = true)
5759
public void cleanupMockBookKeeper() {
5860
bk.getLedgerMap().clear();
61+
entryOffsetsCache.clear();
62+
}
63+
64+
@AfterClass(alwaysRun = true)
65+
public void cleanup() throws Exception {
66+
entryOffsetsCache.close();
67+
scheduler.shutdownNow();
5968
}
6069

6170
protected static MockManagedLedger createMockManagedLedger() {

tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, Map<String,
8282
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
8383
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
8484
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
85-
.create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats);
85+
.create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats, entryOffsetsCache);
8686
return offloader;
8787
}
8888

@@ -91,7 +91,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mo
9191
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
9292
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
9393
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
94-
.create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats);
94+
.create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats, entryOffsetsCache);
9595
return offloader;
9696
}
9797

tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,16 @@ private BlobStoreManagedLedgerOffloader getOffloader(BlobStore mockedBlobStore)
9898
private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws IOException {
9999
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket)));
100100
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
101-
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats);
101+
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats,
102+
entryOffsetsCache);
102103
return offloader;
103104
}
104105

105106
private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore) throws IOException {
106107
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket)));
107108
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
108-
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats);
109+
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats,
110+
entryOffsetsCache);
109111
return offloader;
110112
}
111113

0 commit comments

Comments
 (0)