-
Notifications
You must be signed in to change notification settings - Fork 215
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
reverting back to redis backed cache
- Loading branch information
Showing
6 changed files
with
119 additions
and
69 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,5 +10,5 @@ public interface CounterCache { | |
|
||
boolean exists(String key); | ||
|
||
void clear(String key); | ||
void reset(String key); | ||
} |
109 changes: 109 additions & 0 deletions
109
...reat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package com.akto.threat.detection.cache; | ||
|
||
import com.github.benmanes.caffeine.cache.Cache; | ||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import io.lettuce.core.RedisClient; | ||
import io.lettuce.core.api.StatefulRedisConnection; | ||
|
||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.*; | ||
|
||
public class RedisBackedCounterCache implements CounterCache { | ||
private final StatefulRedisConnection<String, Long> redis; | ||
|
||
private final Cache<String, Long> localCache; | ||
|
||
private final String prefix; | ||
private final ConcurrentLinkedQueue<Object> pendingOps; | ||
|
||
static class PendingCounterOp { | ||
private final String key; | ||
private final long value; | ||
|
||
public PendingCounterOp(String key, long value) { | ||
this.key = key; | ||
this.value = value; | ||
} | ||
|
||
public String getKey() { | ||
return key; | ||
} | ||
|
||
public long getValue() { | ||
return value; | ||
} | ||
} | ||
|
||
public RedisBackedCounterCache(RedisClient redisClient, String prefix) { | ||
this.prefix = prefix; | ||
this.redis = redisClient.connect(new LongValueCodec()); | ||
this.localCache = Caffeine.newBuilder().maximumSize(10000).expireAfterWrite(3, TimeUnit.HOURS).build(); | ||
this.pendingOps = new ConcurrentLinkedQueue<>(); | ||
} | ||
|
||
@Override | ||
public void increment(String key) { | ||
this.incrementBy(key, 1); | ||
} | ||
|
||
@Override | ||
public void incrementBy(String key, long val) { | ||
long cv = this.get(key); | ||
this.localCache.put(key, cv + val); | ||
|
||
this.pendingOps.add(new PendingCounterOp(key, val)); | ||
if (this.pendingOps.size() >= 100) { | ||
this.flush(); | ||
} | ||
} | ||
|
||
@Override | ||
public long get(String key) { | ||
if (this.localCache.asMap().containsKey(key)) { | ||
return this.localCache.asMap().get(key); | ||
} | ||
|
||
Long rv = this.redis.sync().hget(prefix, key); | ||
|
||
this.localCache.put(key, rv != null ? rv : 0L); | ||
return rv != null ? rv : 0L; | ||
} | ||
|
||
@Override | ||
public boolean exists(String key) { | ||
if (this.localCache.asMap().containsKey(key)) { | ||
return true; | ||
} | ||
|
||
return this.redis.sync().hexists(prefix, key); | ||
} | ||
|
||
@Override | ||
public void reset(String key) { | ||
this.localCache.put(key, 0L); | ||
this.redis.async().hset(prefix, key, 0L); | ||
} | ||
|
||
private void flush() { | ||
Set<String> keys = new HashSet<>(); | ||
while (!this.pendingOps.isEmpty()) { | ||
PendingCounterOp op = (PendingCounterOp) this.pendingOps.poll(); | ||
keys.add(op.getKey()); | ||
} | ||
|
||
Map<String, Long> val = new HashMap<>(); | ||
for (String key : keys) { | ||
long cv = this.localCache.asMap().getOrDefault(key, 0L); | ||
val.put(key, cv); | ||
} | ||
|
||
this.redis.async().hset(prefix, val); | ||
val.forEach((k, v) -> this.redis.async().expire(k, 3 * 60 * 60)); | ||
|
||
this.pendingOps.clear(); | ||
} | ||
|
||
} |
64 changes: 0 additions & 64 deletions
64
apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters