Skip to content
This repository was archived by the owner on May 30, 2024. It is now read-only.

Commit d5704eb

Browse files
authored
Merge pull request #57 from launchdarkly/eb/ch13390/redis-watch
fix Redis optimistic locking logic to retry updates as needed
2 parents 91b0af8 + e0863c5 commit d5704eb

File tree

2 files changed

+152
-93
lines changed

2 files changed

+152
-93
lines changed

src/main/java/com/launchdarkly/client/RedisFeatureStore.java

Lines changed: 81 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,6 @@
11
package com.launchdarkly.client;
22

3-
import static com.launchdarkly.client.VersionedDataKind.FEATURES;
4-
5-
import java.io.IOException;
6-
import java.net.URI;
7-
import java.util.HashMap;
8-
import java.util.Map;
9-
import java.util.concurrent.ExecutorService;
10-
import java.util.concurrent.Executors;
11-
import java.util.concurrent.ThreadFactory;
12-
import java.util.concurrent.TimeUnit;
13-
14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
16-
3+
import com.google.common.annotations.VisibleForTesting;
174
import com.google.common.base.Optional;
185
import com.google.common.cache.CacheBuilder;
196
import com.google.common.cache.CacheLoader;
@@ -24,6 +11,20 @@
2411
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2512
import com.google.gson.Gson;
2613

14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
import java.io.IOException;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ThreadFactory;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import static com.launchdarkly.client.VersionedDataKind.FEATURES;
27+
2728
import redis.clients.jedis.Jedis;
2829
import redis.clients.jedis.JedisPool;
2930
import redis.clients.jedis.JedisPoolConfig;
@@ -38,12 +39,15 @@ public class RedisFeatureStore implements FeatureStore {
3839
private static final String DEFAULT_PREFIX = "launchdarkly";
3940
private static final String INIT_KEY = "$initialized$";
4041
private static final String CACHE_REFRESH_THREAD_POOL_NAME_FORMAT = "RedisFeatureStore-cache-refresher-pool-%d";
42+
private static final Gson gson = new Gson();
43+
4144
private final JedisPool pool;
4245
private LoadingCache<CacheKey, Optional<VersionedData>> cache;
4346
private final LoadingCache<String, Boolean> initCache = createInitCache();
4447
private String prefix;
4548
private ListeningExecutorService executorService;
46-
49+
private UpdateListener updateListener;
50+
4751
private static class CacheKey {
4852
final VersionedDataKind<?> kind;
4953
final String key;
@@ -102,10 +106,6 @@ private void setPrefix(String prefix) {
102106
}
103107
}
104108

105-
private void createCache(long cacheTimeSecs) {
106-
createCache(cacheTimeSecs, false, false);
107-
}
108-
109109
private void createCache(long cacheTimeSecs, boolean refreshStaleValues, boolean asyncRefresh) {
110110
if (cacheTimeSecs > 0) {
111111
if (refreshStaleValues) {
@@ -120,7 +120,9 @@ private CacheLoader<CacheKey, Optional<VersionedData>> createDefaultCacheLoader(
120120
return new CacheLoader<CacheKey, Optional<VersionedData>>() {
121121
@Override
122122
public Optional<VersionedData> load(CacheKey key) throws Exception {
123-
return Optional.<VersionedData>fromNullable(getRedis(key.kind, key.key));
123+
try (Jedis jedis = pool.getResource()) {
124+
return Optional.<VersionedData>fromNullable(getRedisEvenIfDeleted(key.kind, key.key, jedis));
125+
}
124126
}
125127
};
126128
}
@@ -169,7 +171,13 @@ public <T extends VersionedData> T get(VersionedDataKind<T> kind, String key) {
169171
if (cache != null) {
170172
item = (T) cache.getUnchecked(new CacheKey(kind, key)).orNull();
171173
} else {
172-
item = getRedis(kind, key);
174+
try (Jedis jedis = pool.getResource()) {
175+
item = getRedisEvenIfDeleted(kind, key, jedis);
176+
}
177+
}
178+
if (item != null && item.isDeleted()) {
179+
logger.debug("[get] Key: {} has been deleted in \"{}\". Returning null", key, kind.getNamespace());
180+
return null;
173181
}
174182
if (item != null) {
175183
logger.debug("[get] Key: {} with version: {} found in \"{}\".", key, item.getVersion(), kind.getNamespace());
@@ -182,7 +190,6 @@ public <T extends VersionedData> Map<String, T> all(VersionedDataKind<T> kind) {
182190
try (Jedis jedis = pool.getResource()) {
183191
Map<String, String> allJson = jedis.hgetAll(itemsKey(kind));
184192
Map<String, T> result = new HashMap<>();
185-
Gson gson = new Gson();
186193

187194
for (Map.Entry<String, String> entry : allJson.entrySet()) {
188195
T item = gson.fromJson(entry.getValue(), kind.getItemClass());
@@ -197,7 +204,6 @@ public <T extends VersionedData> Map<String, T> all(VersionedDataKind<T> kind) {
197204
@Override
198205
public void init(Map<VersionedDataKind<?>, Map<String, ? extends VersionedData>> allData) {
199206
try (Jedis jedis = pool.getResource()) {
200-
Gson gson = new Gson();
201207
Transaction t = jedis.multi();
202208

203209
for (Map.Entry<VersionedDataKind<?>, Map<String, ? extends VersionedData>> entry: allData.entrySet()) {
@@ -216,63 +222,54 @@ public void init(Map<VersionedDataKind<?>, Map<String, ? extends VersionedData>>
216222

217223
@Override
218224
public <T extends VersionedData> void delete(VersionedDataKind<T> kind, String key, int version) {
219-
Jedis jedis = null;
220-
try {
221-
Gson gson = new Gson();
222-
jedis = pool.getResource();
223-
String baseKey = itemsKey(kind);
224-
jedis.watch(baseKey);
225-
226-
VersionedData item = getRedis(kind, key, jedis);
227-
228-
if (item != null && item.getVersion() >= version) {
229-
logger.warn("Attempted to delete key: {} version: {}" +
230-
" with a version that is the same or older: {} in \"{}\"",
231-
key, item.getVersion(), version, kind.getNamespace());
232-
return;
233-
}
234-
235-
VersionedData deletedItem = kind.makeDeletedItem(key, version);
236-
jedis.hset(baseKey, key, gson.toJson(deletedItem));
237-
238-
if (cache != null) {
239-
cache.invalidate(new CacheKey(kind, key));
240-
}
241-
} finally {
242-
if (jedis != null) {
243-
jedis.unwatch();
244-
jedis.close();
245-
}
246-
}
225+
T deletedItem = kind.makeDeletedItem(key, version);
226+
updateItemWithVersioning(kind, deletedItem);
247227
}
248-
228+
249229
@Override
250230
public <T extends VersionedData> void upsert(VersionedDataKind<T> kind, T item) {
251-
Jedis jedis = null;
252-
try {
253-
jedis = pool.getResource();
254-
Gson gson = new Gson();
255-
String baseKey = itemsKey(kind);
256-
jedis.watch(baseKey);
257-
258-
VersionedData old = getRedisEvenIfDeleted(kind, item.getKey(), jedis);
259-
260-
if (old != null && old.getVersion() >= item.getVersion()) {
261-
logger.warn("Attempted to update key: {} version: {}" +
262-
" with a version that is the same or older: {} in \"{}\"",
263-
item.getKey(), old.getVersion(), item.getVersion(), kind.getNamespace());
264-
return;
265-
}
266-
267-
jedis.hset(baseKey, item.getKey(), gson.toJson(item));
231+
updateItemWithVersioning(kind, item);
232+
}
268233

269-
if (cache != null) {
270-
cache.invalidate(new CacheKey(kind, item.getKey()));
271-
}
272-
} finally {
273-
if (jedis != null) {
274-
jedis.unwatch();
275-
jedis.close();
234+
private <T extends VersionedData> void updateItemWithVersioning(VersionedDataKind<T> kind, T newItem) {
235+
while (true) {
236+
Jedis jedis = null;
237+
try {
238+
jedis = pool.getResource();
239+
String baseKey = itemsKey(kind);
240+
jedis.watch(baseKey);
241+
242+
if (updateListener != null) {
243+
updateListener.aboutToUpdate(baseKey, newItem.getKey());
244+
}
245+
246+
VersionedData oldItem = getRedisEvenIfDeleted(kind, newItem.getKey(), jedis);
247+
248+
if (oldItem != null && oldItem.getVersion() >= newItem.getVersion()) {
249+
logger.warn("Attempted to {} key: {} version: {}" +
250+
" with a version that is the same or older: {} in \"{}\"",
251+
newItem.isDeleted() ? "delete" : "update",
252+
newItem.getKey(), oldItem.getVersion(), newItem.getVersion(), kind.getNamespace());
253+
return;
254+
}
255+
256+
Transaction tx = jedis.multi();
257+
tx.hset(baseKey, newItem.getKey(), gson.toJson(newItem));
258+
List<Object> result = tx.exec();
259+
if (result.isEmpty()) {
260+
// if exec failed, it means the watch was triggered and we should retry
261+
logger.debug("Concurrent modification detected, retrying");
262+
continue;
263+
}
264+
265+
if (cache != null) {
266+
cache.invalidate(new CacheKey(kind, newItem.getKey()));
267+
}
268+
} finally {
269+
if (jedis != null) {
270+
jedis.unwatch();
271+
jedis.close();
272+
}
276273
}
277274
}
278275
}
@@ -323,23 +320,7 @@ private Boolean getInit() {
323320
}
324321
}
325322

326-
private <T extends VersionedData> T getRedis(VersionedDataKind<T> kind, String key) {
327-
try (Jedis jedis = pool.getResource()) {
328-
return getRedis(kind, key, jedis);
329-
}
330-
}
331-
332-
private <T extends VersionedData> T getRedis(VersionedDataKind<T> kind, String key, Jedis jedis) {
333-
T item = getRedisEvenIfDeleted(kind, key, jedis);
334-
if (item != null && item.isDeleted()) {
335-
logger.debug("[get] Key: {} has been deleted in \"{}\". Returning null", key, kind.getNamespace());
336-
return null;
337-
}
338-
return item;
339-
}
340-
341323
private <T extends VersionedData> T getRedisEvenIfDeleted(VersionedDataKind<T> kind, String key, Jedis jedis) {
342-
Gson gson = new Gson();
343324
String json = jedis.hget(itemsKey(kind), key);
344325

345326
if (json == null) {
@@ -354,4 +335,12 @@ private static JedisPoolConfig getPoolConfig() {
354335
return new JedisPoolConfig();
355336
}
356337

338+
static interface UpdateListener {
339+
void aboutToUpdate(String baseKey, String itemKey);
340+
}
341+
342+
@VisibleForTesting
343+
void setUpdateListener(UpdateListener updateListener) {
344+
this.updateListener = updateListener;
345+
}
357346
}
Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,83 @@
11
package com.launchdarkly.client;
22

3-
import java.net.URI;
3+
import com.google.gson.Gson;
44

5+
import org.junit.Assert;
56
import org.junit.Before;
7+
import org.junit.Test;
8+
9+
import java.net.URI;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
13+
import static com.launchdarkly.client.VersionedDataKind.FEATURES;
14+
import static java.util.Collections.singletonMap;
15+
16+
import redis.clients.jedis.Jedis;
617

718
public class RedisFeatureStoreTest extends FeatureStoreTestBase<RedisFeatureStore> {
819

920
@Before
1021
public void setup() {
1122
store = new RedisFeatureStoreBuilder(URI.create("redis://localhost:6379"), 10).build();
1223
}
24+
25+
@Test
26+
public void handlesUpsertRaceConditionAgainstExternalClientWithLowerVersion() {
27+
final Jedis otherClient = new Jedis("localhost");
28+
try {
29+
final FeatureFlag flag = new FeatureFlagBuilder("foo").version(1).build();
30+
initStoreWithSingleFeature(store, flag);
31+
32+
store.setUpdateListener(makeConcurrentModifier(otherClient, flag, 2, 4));
33+
34+
FeatureFlag myVer = new FeatureFlagBuilder(flag).version(10).build();
35+
store.upsert(FEATURES, myVer);
36+
FeatureFlag result = store.get(FEATURES, feature1.getKey());
37+
Assert.assertEquals(myVer.getVersion(), result.getVersion());
38+
} finally {
39+
otherClient.close();
40+
}
41+
}
42+
43+
@Test
44+
public void handlesUpsertRaceConditionAgainstExternalClientWithHigherVersion() {
45+
final Jedis otherClient = new Jedis("localhost");
46+
try {
47+
final FeatureFlag flag = new FeatureFlagBuilder("foo").version(1).build();
48+
initStoreWithSingleFeature(store, flag);
49+
50+
store.setUpdateListener(makeConcurrentModifier(otherClient, flag, 3, 3));
51+
52+
FeatureFlag myVer = new FeatureFlagBuilder(flag).version(2).build();
53+
store.upsert(FEATURES, myVer);
54+
FeatureFlag result = store.get(FEATURES, feature1.getKey());
55+
Assert.assertEquals(3, result.getVersion());
56+
} finally {
57+
otherClient.close();
58+
}
59+
}
60+
61+
private void initStoreWithSingleFeature(RedisFeatureStore store, FeatureFlag flag) {
62+
Map<String, FeatureFlag> flags = singletonMap(flag.getKey(), flag);
63+
Map<VersionedDataKind<?>, Map<String, ? extends VersionedData>> allData = new HashMap<>();
64+
allData.put(FEATURES, flags);
65+
store.init(allData);
66+
}
67+
68+
private RedisFeatureStore.UpdateListener makeConcurrentModifier(final Jedis otherClient, final FeatureFlag flag,
69+
final int startVersion, final int endVersion) {
70+
final Gson gson = new Gson();
71+
return new RedisFeatureStore.UpdateListener() {
72+
int versionCounter = startVersion;
73+
@Override
74+
public void aboutToUpdate(String baseKey, String itemKey) {
75+
if (versionCounter <= endVersion) {
76+
FeatureFlag newVer = new FeatureFlagBuilder(flag).version(versionCounter).build();
77+
versionCounter++;
78+
otherClient.hset(baseKey, flag.getKey(), gson.toJson(newVer));
79+
}
80+
}
81+
};
82+
}
1383
}

0 commit comments

Comments
 (0)