Skip to content

Commit 123387c

Browse files
committed
send to threat kafka directly
1 parent fd029bb commit 123387c

File tree

3 files changed

+149
-29
lines changed

3 files changed

+149
-29
lines changed

apps/data-ingestion-service/src/main/java/com/akto/action/IngestionAction.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,8 @@ public class IngestionAction extends ActionSupport {
2525

2626
private static final int ACCOUNT_ID_TO_ADD_DEFAULT_DATA = getAccountId();
2727

28-
private boolean sendLogsToCustomAccount(List<IngestDataBatch> batchData){
29-
if (batchData == null || batchData.isEmpty()) {
30-
return false;
31-
}
32-
33-
34-
for (IngestDataBatch batch : batchData) {
35-
String requestHeaders = batch.getRequestHeaders();
36-
if (requestHeaders != null) {
37-
String lowerHeaders = requestHeaders.toLowerCase();
38-
if (lowerHeaders.contains("\"host\":") || lowerHeaders.contains("\"host \":")) {
39-
if (lowerHeaders.contains("hollywoodbets") ||
40-
lowerHeaders.contains("betsolutions") ||
41-
lowerHeaders.contains("betnix") ||
42-
lowerHeaders.contains("betsoft")) {
43-
return true;
44-
}
45-
}
46-
}
47-
}
48-
49-
return true;
50-
}
51-
5228
public String ingestData() {
5329
try {
54-
if(sendLogsToCustomAccount(batchData)){
55-
System.setProperty("DATABASE_ABSTRACTOR_SERVICE_TOKEN", "eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJBa3RvIiwic3ViIjoiaW52aXRlX3VzZXIiLCJhY2NvdW50SWQiOjE2NjI2ODA0NjMsImlhdCI6MTc2MDU5NzM0OCwiZXhwIjoxNzc2MzIyMTQ4fQ.b-aqZEiTinzE1tavKDe6t7Ec7TsnsGoVRdxCiMmeOM20JcJ7aEgOZaJxD7O9zyoD6AEXmpEghd04wGhGCECBOKWivDS8Y_fdatLw8R7hH0Y-pu8QEMC1whbXXJrNhsRGXihLIiQ80nDKbrv6ObbyDwy4NPYoCFK8Mpu2i4W8qZHBJXnxmVkCp8Cp_LyeDLotXvc8DAp9huHASil0BSOxiUwHsw3Efk4BkRlHADfAwGFz4j-ozdbiK0SHHvOZNicl1wgpvDk0nHRLhIg3Ynx-Fk4Pp0agb0MCpS55-CRMBbx3zy9xRdkhIGdOydEzZKK5p311hwPnxxeL6Dp1C2f89g");
56-
}
5730

5831
printLogs("ingestData batch size " + batchData.size());
5932
for (IngestDataBatch payload: batchData) {

apps/data-ingestion-service/src/main/java/com/akto/utils/KafkaUtils.java

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,91 @@
11
package com.akto.utils;
22

33
import com.akto.log.LoggerMaker;
4-
import com.akto.action.IngestionAction;
54
import com.akto.dao.context.Context;
65
import com.akto.dto.IngestDataBatch;
76
import com.akto.kafka.Kafka;
7+
import com.akto.kafka.KafkaConfig;
8+
import com.akto.kafka.Serializer;
9+
import com.akto.kafka.KafkaProtoProducer;
10+
import com.akto.kafka.KafkaProducerConfig;
811
import com.mongodb.BasicDBObject;
12+
import com.akto.proto.http_response_param.v1.HttpResponseParam;
13+
import com.akto.proto.http_response_param.v1.StringList;
14+
import java.util.HashMap;
15+
import java.util.Map;
916

1017
public class KafkaUtils {
1118

1219
private static final LoggerMaker logger = new LoggerMaker(KafkaUtils.class, LoggerMaker.LogDb.DATA_INGESTION);
1320
private static Kafka kafkaProducer;
21+
private static KafkaProtoProducer kafkaProtoProducer;
1422

1523
public void initKafkaProducer() {
1624
String kafkaBrokerUrl = System.getenv().getOrDefault("AKTO_KAFKA_BROKER_URL", "localhost:29092");
1725
int batchSize = Integer.parseInt(System.getenv().getOrDefault("AKTO_KAFKA_PRODUCER_BATCH_SIZE", "100"));
1826
int kafkaLingerMS = Integer.parseInt(System.getenv().getOrDefault("AKTO_KAFKA_PRODUCER_LINGER_MS", "10"));
1927
kafkaProducer = new Kafka(kafkaBrokerUrl, kafkaLingerMS, batchSize, LoggerMaker.LogDb.DATA_INGESTION);
20-
logger.infoAndAddToDb("Kafka Producer Init " + Context.now(), LoggerMaker.LogDb.DATA_INGESTION);
28+
29+
// Initialize protobuf kafka producer
30+
KafkaConfig protoKafkaConfig = KafkaConfig.newBuilder()
31+
.setBootstrapServers(kafkaBrokerUrl)
32+
.setKeySerializer(Serializer.STRING)
33+
.setValueSerializer(Serializer.BYTE_ARRAY)
34+
.setProducerConfig(KafkaProducerConfig.newBuilder()
35+
.setLingerMs(kafkaLingerMS)
36+
.setBatchSize(batchSize)
37+
.build())
38+
.build();
39+
kafkaProtoProducer = new KafkaProtoProducer(protoKafkaConfig);
40+
41+
logger.infoAndAddToDb("Kafka Producer Init " + Context.now());
42+
}
43+
44+
public static boolean shouldSendToThreatTopic(String requestHeaders){
45+
46+
if (requestHeaders == null || requestHeaders.isEmpty()) {
47+
return false;
48+
}
49+
50+
String lowerHeaders = requestHeaders.toLowerCase();
51+
if (lowerHeaders.contains("\"host\":") || lowerHeaders.contains("\"host \":")) {
52+
if (lowerHeaders.contains("hollywoodbets") ||
53+
lowerHeaders.contains("betsolutions") ||
54+
lowerHeaders.contains("betnix") ||
55+
lowerHeaders.contains("betsoft")) {
56+
return true;
57+
}
58+
}
59+
return false;
60+
}
61+
62+
private static Map<String, StringList> parseHeadersToProto(String headersJson) {
63+
Map<String, StringList> headers = new HashMap<>();
64+
if (headersJson == null || headersJson.isEmpty()) {
65+
return headers;
66+
}
67+
68+
try {
69+
// Parse JSON string to extract headers
70+
headersJson = headersJson.trim();
71+
if (headersJson.startsWith("{") && headersJson.endsWith("}")) {
72+
headersJson = headersJson.substring(1, headersJson.length() - 1);
73+
String[] pairs = headersJson.split(",");
74+
75+
for (String pair : pairs) {
76+
String[] keyValue = pair.split(":", 2);
77+
if (keyValue.length == 2) {
78+
String key = keyValue[0].trim().replaceAll("\"", "");
79+
String value = keyValue[1].trim().replaceAll("\"", "");
80+
headers.put(key, StringList.newBuilder().addValues(value).build());
81+
}
82+
}
83+
}
84+
} catch (Exception e) {
85+
logger.errorAndAddToDb("Error parsing headers: " + e.getMessage(), LoggerMaker.LogDb.DATA_INGESTION);
86+
}
87+
88+
return headers;
2189
}
2290

2391
public static void insertData(IngestDataBatch payload) {
@@ -47,6 +115,36 @@ public static void insertData(IngestDataBatch payload) {
47115
obj.put("tag", payload.getTag());
48116
kafkaProducer.send(obj.toString(), "akto.api.logs");
49117
//IngestionAction.printLogs("Inserted to kafka: " + obj.toString());
118+
119+
if(shouldSendToThreatTopic(payload.getRequestHeaders())){
120+
// create a HttpResponseParam protobuf object from payload send to akto.api.logs2 topic
121+
HttpResponseParam.Builder builder = HttpResponseParam.newBuilder();
122+
123+
// Parse headers to protobuf format
124+
Map<String, StringList> requestHeaders = parseHeadersToProto(payload.getRequestHeaders());
125+
Map<String, StringList> responseHeaders = parseHeadersToProto(payload.getResponseHeaders());
126+
127+
builder.setMethod(payload.getMethod() != null ? payload.getMethod() : "")
128+
.setPath(payload.getPath() != null ? payload.getPath() : "")
129+
.setType(payload.getType() != null ? payload.getType() : "HTTP/1.1")
130+
.putAllRequestHeaders(requestHeaders)
131+
.putAllResponseHeaders(responseHeaders)
132+
.setRequestPayload(payload.getRequestPayload() != null ? payload.getRequestPayload() : "")
133+
.setResponsePayload(payload.getResponsePayload() != null ? payload.getResponsePayload() : "")
134+
.setStatusCode(payload.getStatusCode() != null ? Integer.parseInt(payload.getStatusCode()) : 0)
135+
.setStatus(payload.getStatus() != null ? payload.getStatus() : "")
136+
.setTime(payload.getTime() != null ? Integer.parseInt(payload.getTime()) : (int)(System.currentTimeMillis() / 1000))
137+
.setAktoAccountId(payload.getAkto_account_id() != null ? payload.getAkto_account_id() : "")
138+
.setAktoVxlanId(payload.getAkto_vxlan_id() != null ? payload.getAkto_vxlan_id() : "")
139+
.setIp(payload.getIp() != null ? payload.getIp() : "")
140+
.setDestIp(payload.getDestIp() != null ? payload.getDestIp() : "")
141+
.setDirection(payload.getDirection() != null ? payload.getDirection() : "")
142+
.setIsPending(payload.getIs_pending() != null ? Boolean.parseBoolean(payload.getIs_pending()) : false)
143+
.setSource(payload.getSource() != null ? payload.getSource() : "");
144+
145+
HttpResponseParam httpResponseParam = builder.build();
146+
kafkaProtoProducer.send("akto.api.logs2", httpResponseParam);
147+
}
50148
}
51149

52150
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.akto.kafka;
2+
3+
import com.google.protobuf.Message;
4+
import java.time.Duration;
5+
import java.util.Properties;
6+
import org.apache.kafka.clients.producer.*;
7+
8+
public class KafkaProtoProducer {
9+
private final KafkaProducer<String, byte[]> producer;
10+
public boolean producerReady;
11+
12+
public KafkaProtoProducer(KafkaConfig kafkaConfig) {
13+
this.producer =
14+
generateProducer(
15+
kafkaConfig.getBootstrapServers(),
16+
kafkaConfig.getProducerConfig().getLingerMs(),
17+
kafkaConfig.getProducerConfig().getBatchSize());
18+
}
19+
20+
public void send(String topic, Message message) {
21+
byte[] messageBytes = message.toByteArray();
22+
this.producer.send(new ProducerRecord<>(topic, messageBytes));
23+
}
24+
25+
public void close() {
26+
this.producerReady = false;
27+
producer.close(Duration.ofMillis(0)); // close immediately
28+
}
29+
30+
private KafkaProducer<String, byte[]> generateProducer(
31+
String brokerIP, int lingerMS, int batchSize) {
32+
if (producer != null) close(); // close existing producer connection
33+
34+
int requestTimeoutMs = 5000;
35+
Properties kafkaProps = new Properties();
36+
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP);
37+
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.STRING.getSerializer());
38+
kafkaProps.put(
39+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.BYTE_ARRAY.getSerializer());
40+
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
41+
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS);
42+
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0);
43+
kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
44+
kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, lingerMS + requestTimeoutMs);
45+
kafkaProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
46+
return new KafkaProducer<>(kafkaProps);
47+
}
48+
}
49+

0 commit comments

Comments
 (0)