diff --git a/apps/threat-detection/pom.xml b/apps/threat-detection/pom.xml index e5e5cd3e2c..e642895817 100644 --- a/apps/threat-detection/pom.xml +++ b/apps/threat-detection/pom.xml @@ -132,6 +132,12 @@ 2.24.2 + + com.github.ben-manes.caffeine + caffeine + 2.9.3 + + diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/CounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/CounterCache.java index f1eaeffa90..08efe0f1a7 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/CounterCache.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/CounterCache.java @@ -10,5 +10,5 @@ public interface CounterCache { boolean exists(String key); - void clear(String key); + void reset(String key); } diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java new file mode 100644 index 0000000000..6d053c6be7 --- /dev/null +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisBackedCounterCache.java @@ -0,0 +1,109 @@ +package com.akto.threat.detection.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; + +public class RedisBackedCounterCache implements CounterCache { + private final StatefulRedisConnection redis; + + private final Cache localCache; + + private final String prefix; + private final ConcurrentLinkedQueue pendingOps; + + static class PendingCounterOp { + private final String key; + private final long value; + + public PendingCounterOp(String key, long value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public long getValue() { + return value; + } + } + + public RedisBackedCounterCache(RedisClient redisClient, String prefix) { + this.prefix = prefix; + this.redis = redisClient.connect(new LongValueCodec()); + this.localCache = Caffeine.newBuilder().maximumSize(10000).expireAfterWrite(3, TimeUnit.HOURS).build(); + this.pendingOps = new ConcurrentLinkedQueue<>(); + } + + @Override + public void increment(String key) { + this.incrementBy(key, 1); + } + + @Override + public void incrementBy(String key, long val) { + long cv = this.get(key); + this.localCache.put(key, cv + val); + + this.pendingOps.add(new PendingCounterOp(key, val)); + if (this.pendingOps.size() >= 100) { + this.flush(); + } + } + + @Override + public long get(String key) { + if (this.localCache.asMap().containsKey(key)) { + return this.localCache.asMap().get(key); + } + + Long rv = this.redis.sync().hget(prefix, key); + + this.localCache.put(key, rv != null ? rv : 0L); + return rv != null ? rv : 0L; + } + + @Override + public boolean exists(String key) { + if (this.localCache.asMap().containsKey(key)) { + return true; + } + + return this.redis.sync().hexists(prefix, key); + } + + @Override + public void reset(String key) { + this.localCache.put(key, 0L); + this.redis.async().hset(prefix, key, 0L); + } + + private void flush() { + Set keys = new HashSet<>(); + while (!this.pendingOps.isEmpty()) { + PendingCounterOp op = (PendingCounterOp) this.pendingOps.poll(); + keys.add(op.getKey()); + } + + Map val = new HashMap<>(); + for (String key : keys) { + long cv = this.localCache.asMap().getOrDefault(key, 0L); + val.put(key, cv); + } + + this.redis.async().hset(prefix, val); + val.forEach((k, v) -> this.redis.async().expire(k, 3 * 60 * 60)); + + this.pendingOps.clear(); + } + +} \ No newline at end of file diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java deleted file mode 100644 index 63d20312d8..0000000000 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.akto.threat.detection.cache; - -import io.lettuce.core.RedisClient; -import io.lettuce.core.api.StatefulRedisConnection; - -public class RedisCounterCache implements CounterCache { - - static class Op { - private final String key; - private final long value; - - public Op(String key, long value) { - this.key = key; - this.value = value; - } - - public String getKey() { - return key; - } - - public long getValue() { - return value; - } - } - - private final StatefulRedisConnection redis; - - private final String prefix; - - public RedisCounterCache(RedisClient redisClient, String prefix) { - this.prefix = prefix; - this.redis = redisClient.connect(new LongValueCodec()); - } - - private String addPrefixToKey(String key) { - return new StringBuilder().append(prefix).append("|").append(key).toString(); - } - - @Override - public void increment(String key) { - incrementBy(key, 1); - } - - @Override - public void incrementBy(String key, long val) { - redis.async().incrby(addPrefixToKey(key), val); - } - - @Override - public long get(String key) { - return redis.sync().get(addPrefixToKey(key)); - } - - @Override - public boolean exists(String key) { - return redis.sync().exists(addPrefixToKey(key)) > 0; - } - - @Override - public void clear(String key) { - redis.async().del(addPrefixToKey(key)); - } - -} diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/smart_event_detector/window_based/WindowBasedThresholdNotifier.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/smart_event_detector/window_based/WindowBasedThresholdNotifier.java index 01d3b531a7..e556e790c1 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/smart_event_detector/window_based/WindowBasedThresholdNotifier.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/smart_event_detector/window_based/WindowBasedThresholdNotifier.java @@ -70,7 +70,7 @@ public Result shouldNotify(String aggKey, SampleMaliciousRequest maliciousEvent, boolean thresholdBreached = windowCount >= rule.getCondition().getMatchCount(); if (thresholdBreached) { - this.cache.clear(cacheKey); + this.cache.reset(cacheKey); } return new Result(thresholdBreached); diff --git a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java index 49a9a2f288..93fdb86957 100644 --- a/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java +++ b/apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java @@ -9,7 +9,6 @@ import com.akto.dto.HttpResponseParams; import com.akto.dto.RawApi; import com.akto.dto.api_protection_parse_layer.AggregationRules; -import com.akto.dto.api_protection_parse_layer.Condition; import com.akto.dto.api_protection_parse_layer.Rule; import com.akto.dto.monitoring.FilterConfig; import com.akto.dto.test_editor.YamlTemplate; @@ -26,7 +25,7 @@ import com.akto.test_editor.execution.VariableResolver; import com.akto.test_editor.filter.data_operands_impl.ValidationResult; import com.akto.threat.detection.actor.SourceIPActorGenerator; -import com.akto.threat.detection.cache.RedisCounterCache; +import com.akto.threat.detection.cache.RedisBackedCounterCache; import com.akto.threat.detection.constants.KafkaTopic; import com.akto.threat.detection.kafka.KafkaProtoProducer; import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier; @@ -82,7 +81,7 @@ public MaliciousTrafficDetectorTask( this.windowBasedThresholdNotifier = new WindowBasedThresholdNotifier( - new RedisCounterCache(redisClient, "wbt"), + new RedisBackedCounterCache(redisClient, "wbt"), new WindowBasedThresholdNotifier.Config(100, 10 * 60)); this.internalKafka = new KafkaProtoProducer(internalConfig);