From 00369dc59eeefd5898c47fa127ae7c7019aca8cd Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Fri, 7 Jun 2024 16:02:12 +0530 Subject: [PATCH 01/28] enabling http2 and grpc in mirroring for filter_headers --- go.mod | 2 +- go.sum | 3 - main.go | 216 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 217 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index deb02e5..a0b96cb 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/akto-api-security/mirroring-api-logging go 1.17 require ( - github.com/akto-api-security/gomiddleware v0.1.0 github.com/google/gopacket v1.1.19 github.com/segmentio/kafka-go v0.4.25 go.mongodb.org/mongo-driver v1.11.3 + golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 ) require ( diff --git a/go.sum b/go.sum index 3039a4c..b0b4c66 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/akto-api-security/gomiddleware v0.1.0 h1:7yf8j2yKVX1Ar5kBeIMjzBAuOBZj9BvTZJ8uEALmR8s= -github.com/akto-api-security/gomiddleware v0.1.0/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -31,7 +29,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/segmentio/kafka-go v0.4.23/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/segmentio/kafka-go v0.4.25 h1:QVx9yz12syKBFkxR+dVDDwTO0ItHgnjjhIdBfqizj+8= github.com/segmentio/kafka-go v0.4.25/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main.go b/main.go index 04cd15b..3e32e4d 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/base64" "encoding/json" "fmt" "io" @@ -39,6 +40,9 @@ import ( "net" "github.com/segmentio/kafka-go" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" ) var printCounter = 500 @@ -104,6 +108,16 @@ type myFactory struct { source string } +type http2ReqResp struct { + headersMap map[string]string + payload string + isInvalid bool +} + +func (k http2ReqResp) String() string { + return fmt.Sprintf("%v:%v", k.headersMap, k.payload) +} + // New handles creating a new tcpassembly.Stream. func (f *myFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream { // Create a new stream. @@ -186,7 +200,209 @@ func checkIfIp(host string) bool { return net.ParseIP(chunks[0]) != nil } +func tryParseAsHttp2Request(bd *bidi, isPending bool) { + + streamRequestMap := make(map[string][]http2ReqResp) + framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes)) + + headersMap := make(map[string]string) + payload := "" + + gotHeaders := make(map[string]bool) + gotPayload := make(map[string]bool) + decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + + frame, err := framer.ReadFrame() + + if err == io.EOF { + break + } + if err != nil { + continue + } + + streamId := fmt.Sprint(frame.Header().StreamID) + if len(streamId) == 0 { + continue + } + + if !gotHeaders[streamId] { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + gotHeaders[streamId] = true + if err != nil { + } + + case *http2.DataFrame: + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + } + } + + if gotHeaders[streamId] && gotPayload[streamId] { + if _, exists := streamRequestMap[streamId]; !exists { + streamRequestMap[streamId] = []http2ReqResp{} + } + streamRequestMap[streamId] = append(streamRequestMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotHeaders[streamId] = false + gotPayload[streamId] = false + } + } + + gotHeaders = make(map[string]bool) + gotPayload = make(map[string]bool) + gotGrpcHeaders := make(map[string]bool) + headersCount := make(map[string]int) + headersMap = make(map[string]string) + payload = "" + + streamResponseMap := make(map[string][]http2ReqResp) + framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes)) + headersMap = make(map[string]string) + decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + frame, err := framerResp.ReadFrame() + if err == io.EOF { + break + } + if err != nil { + continue + } + + streamId := fmt.Sprint(frame.Header().StreamID) + + if len(streamId) == 0 { + continue + } + if !(gotHeaders[streamId]) { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + if err != nil { + log.Printf("Error response decoding headers: %v", err) + } + if headersCount[streamId] == 0 { + if strings.Contains(headersMap["content-type"], "application/grpc") { + gotGrpcHeaders[streamId] = true + } + gotHeaders[streamId] = true + } + headersCount[streamId]++ + case *http2.DataFrame: + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + } + } + if gotHeaders[streamId] && gotPayload[streamId] { + + if gotGrpcHeaders[streamId] && headersCount[streamId] == 1 { + continue + } + + if _, exists := streamResponseMap[streamId]; !exists { + streamResponseMap[streamId] = []http2ReqResp{} + } + streamResponseMap[streamId] = append(streamResponseMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotPayload[streamId] = false + gotHeaders[streamId] = false + gotGrpcHeaders[streamId] = false + headersCount[streamId] = 0 + } + } + + for streamId, http2Req := range streamRequestMap { + http2Resp := streamResponseMap[streamId] + if len(http2Resp) != len(http2Req) { + continue + } + for req := range http2Req { + + http2Request := http2Req[req] + http2Response := http2Resp[req] + + value := make(map[string]string) + + if path, exists := http2Request.headersMap[":path"]; exists { + value["path"] = path + delete(http2Request.headersMap, ":path") + } + if method, exists := http2Request.headersMap[":method"]; exists { + value["method"] = method + delete(http2Request.headersMap, ":method") + } + if scheme, exists := http2Request.headersMap[":scheme"]; exists { + value["scheme"] = scheme + delete(http2Request.headersMap, ":scheme") + } + if status, exists := http2Response.headersMap[":status"]; exists { + value["statusCode"] = status + delete(http2Response.headersMap, ":status") + } + value["requestPayload"] = http2Request.payload + value["responsePayload"] = http2Request.payload + + if len(http2Request.headersMap) > 0 { + requestHeaders, _ := json.Marshal(http2Request.headersMap) + value["requestHeaders"] = string(requestHeaders) + } + if len(http2Response.headersMap) > 0 { + responseHeader, _ := json.Marshal(http2Response.headersMap) + value["responseHeader"] = string(responseHeader) + } + + value["ip"] = bd.key.net.Src().String() + value["akto_account_id"] = fmt.Sprint(1000000) + value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID) + value["time"] = fmt.Sprint(time.Now().Unix()) + value["is_pending"] = fmt.Sprint(isPending) + value["source"] = bd.source + out, _ := json.Marshal(value) + ctx := context.Background() + + if printCounter > 0 { + printCounter-- + log.Println("req-resp.String()", string(out)) + } + go Produce(kafkaWriter, ctx, string(out)) + } + + } +} + func tryReadFromBD(bd *bidi, isPending bool) { + if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" { + bd.a.bytes = bd.a.bytes[24:] + tryParseAsHttp2Request(bd, isPending) + return + } + reader := bufio.NewReader(bytes.NewReader(bd.a.bytes)) i := 0 requests := []http.Request{} From 134287865b5708e5d8346d0c774017c1875a02a2 Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Fri, 7 Jun 2024 23:59:46 +0530 Subject: [PATCH 02/28] bug fixes and handling the case when "path" is empty --- main.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 3e32e4d..7f90085 100644 --- a/main.go +++ b/main.go @@ -352,6 +352,8 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { if path, exists := http2Request.headersMap[":path"]; exists { value["path"] = path delete(http2Request.headersMap, ":path") + } else { + return } if method, exists := http2Request.headersMap[":method"]; exists { value["method"] = method @@ -363,10 +365,11 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { } if status, exists := http2Response.headersMap[":status"]; exists { value["statusCode"] = status + value["status"] = status delete(http2Response.headersMap, ":status") } value["requestPayload"] = http2Request.payload - value["responsePayload"] = http2Request.payload + value["responsePayload"] = http2Response.payload if len(http2Request.headersMap) > 0 { requestHeaders, _ := json.Marshal(http2Request.headersMap) @@ -374,9 +377,10 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { } if len(http2Response.headersMap) > 0 { responseHeader, _ := json.Marshal(http2Response.headersMap) - value["responseHeader"] = string(responseHeader) + value["responseHeaders"] = string(responseHeader) } + value["type"] = "HTTP/2" value["ip"] = bd.key.net.Src().String() value["akto_account_id"] = fmt.Sprint(1000000) value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID) From 36dcf8a49f975892cd9e381b905d11a53c18ec8e Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Sat, 8 Jun 2024 00:17:21 +0530 Subject: [PATCH 03/28] bug fixes and handling the case when "path" is empty --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 7f90085..cb54f9a 100644 --- a/main.go +++ b/main.go @@ -353,7 +353,7 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { value["path"] = path delete(http2Request.headersMap, ":path") } else { - return + continue } if method, exists := http2Request.headersMap[":method"]; exists { value["method"] = method From 702e7a44baa224aadf2c423e9a41277e076e91f7 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Mon, 13 Jan 2025 12:22:46 +0530 Subject: [PATCH 04/28] upgrade go version to 1.22 --- .github/workflows/main.yml | 2 +- Dockerfile | 2 +- go.mod | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4d481d9..90cdad2 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -37,7 +37,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: '^1.16.1' # The Go version to download (if necessary) and use. + go-version: '^1.22' # The Go version to download (if necessary) and use. - name: install required packages run: sudo apt install libpcap-dev expect - run: go build -o ./mirroring-api-logging diff --git a/Dockerfile b/Dockerfile index 148b6c0..1ac66d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.18-alpine +FROM golang:1.22-alpine RUN apk add build-base RUN apk add libpcap-dev diff --git a/go.mod b/go.mod index e14bfa3..6beb882 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/akto-api-security/mirroring-api-logging -go 1.17 +go 1.22 require ( github.com/akto-api-security/gomiddleware v0.1.0 From e43434fbea6def82f208f736f80f62442172fbb3 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Wed, 15 Jan 2025 01:19:23 +0530 Subject: [PATCH 05/28] use ip as partition key --- kafka.go | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/kafka.go b/kafka.go index e1b2a85..fa90f77 100644 --- a/kafka.go +++ b/kafka.go @@ -7,9 +7,20 @@ import ( "github.com/segmentio/kafka-go" "google.golang.org/protobuf/proto" "log" + "strings" "time" ) +var CLIENT_IP_HEADERS = []string{ + "x-forwarded-for", + "x-real-ip", + "x-cluster-client-ip", + "true-client-ip", + "x-original-forwarded-for", + "x-client-ip", + "client-ip", +} + func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.HttpResponseParam) error { // intialize the writer with the broker addresses, and the topic protoBytes, err := proto.Marshal(value) @@ -18,10 +29,11 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht return err } + ip := GetSourceIp(value) // Send serialized message to Kafka msg := kafka.Message{ Topic: "akto.api.logs2", - Key: []byte("testkey"), + Key: []byte(ip), // what to do when ip is empty? Value: protoBytes, } @@ -32,6 +44,26 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht return err } +func GetSourceIp(value *trafficpb.HttpResponseParam) string { + + for _, header := range CLIENT_IP_HEADERS { + if headerValues, exists := value.ResponseHeaders[header]; exists { + for _, value := range headerValues.Values { + parts := strings.Split(value, ",") + for _, part := range parts { + ip := strings.TrimSpace(part) + if ip != "" { + return ip + } + } + } + } + } + + // if no headers found + return value.Ip +} + func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) error { // intialize the writer with the broker addresses, and the topic msg := kafka.Message{ From 5a9ce9be35d183eaaeab4df3c7222fecc2145e52 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Wed, 15 Jan 2025 01:32:09 +0530 Subject: [PATCH 06/28] avoid kafka push if ip is empty --- kafka.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka.go b/kafka.go index fa90f77..590ac43 100644 --- a/kafka.go +++ b/kafka.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "fmt" trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload" "github.com/segmentio/kafka-go" "google.golang.org/protobuf/proto" @@ -30,6 +31,10 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht } ip := GetSourceIp(value) + if ip == "" { + fmt.Print("ip is empty, avoiding kafka push") + return nil + } // Send serialized message to Kafka msg := kafka.Message{ Topic: "akto.api.logs2", From e9d614b81091de3571cdec31b2d5ea2e326df2f4 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Wed, 15 Jan 2025 18:33:28 +0530 Subject: [PATCH 07/28] add logs --- kafka.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka.go b/kafka.go index 590ac43..b22f5d2 100644 --- a/kafka.go +++ b/kafka.go @@ -31,6 +31,7 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht } ip := GetSourceIp(value) + fmt.Println("found ip ", ip) if ip == "" { fmt.Print("ip is empty, avoiding kafka push") return nil @@ -58,6 +59,8 @@ func GetSourceIp(value *trafficpb.HttpResponseParam) string { for _, part := range parts { ip := strings.TrimSpace(part) if ip != "" { + fmt.Println("found value of ip in header ", header) + fmt.Println("returning ip ", ip) return ip } } @@ -66,6 +69,7 @@ func GetSourceIp(value *trafficpb.HttpResponseParam) string { } // if no headers found + fmt.Println("ip not found in headers, returning value ", value.Ip) return value.Ip } From ea620760b21a3b724bb8925857869d9c55c4bc98 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Wed, 15 Jan 2025 20:21:17 +0530 Subject: [PATCH 08/28] add logs --- kafka.go | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka.go b/kafka.go index b22f5d2..605279d 100644 --- a/kafka.go +++ b/kafka.go @@ -32,6 +32,7 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht ip := GetSourceIp(value) fmt.Println("found ip ", ip) + fmt.Println("found ip bytes ", []byte(ip)) if ip == "" { fmt.Print("ip is empty, avoiding kafka push") return nil From e0aae2b39b91fd6e839c1f27990dd7256f4c5520 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Wed, 15 Jan 2025 21:06:35 +0530 Subject: [PATCH 09/28] add hash key balancer in kafkawriter init --- kafka.go | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka.go b/kafka.go index 605279d..021fe1e 100644 --- a/kafka.go +++ b/kafka.go @@ -99,6 +99,7 @@ func GetKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Dur MaxAttempts: 1, ReadTimeout: batchTimeout, WriteTimeout: batchTimeout, + Balancer: &kafka.Hash{}, } } From 2da42d507efa9fb568d7ca14e2cf126717a37b9d Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Fri, 14 Feb 2025 13:52:39 +0530 Subject: [PATCH 10/28] modify run sh script --- run.sh | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/run.sh b/run.sh index fc36636..192e9b7 100755 --- a/run.sh +++ b/run.sh @@ -1,19 +1,43 @@ #!/bin/sh +LOG_FILE="/tmp/dump.log" +MAX_LOG_SIZE=${MAX_LOG_SIZE:-10480} # Default to 10 MB (10 * 1024 * 1024 bytes) +CHECK_INTERVAL=60 # Check interval in seconds + +# Function to rotate the log file +rotate_log() { + if [ -f "$LOG_FILE" ] && [ -s "$LOG_FILE" ]; then + log_size=$(stat -c%s "$LOG_FILE") # Get the size of the log file + if [ "$log_size" -ge "$MAX_LOG_SIZE" ]; then + echo "" > "$LOG_FILE" + fi + fi +} + +# Log rotation monitoring (only if ENABLE_LOGS is false) +if [[ "${ENABLE_LOGS}" == "false" ]]; then + while true; do + rotate_log # Check and rotate logs if necessary + sleep "$CHECK_INTERVAL" + done & +fi + while : do # Start the mirroring module in the background - /mirroring-api-logging & + if [[ "${ENABLE_LOGS}" == "false" ]]; then + /mirroring-api-logging >> "$LOG_FILE" 2>&1 & + else + /mirroring-api-logging & + fi mirroring_pid=$! # Monitor the process for 1 hour elapsed=0 while [ $elapsed -lt 3600 ]; do - # Check if the mirroring process is still running if ! kill -0 $mirroring_pid 2>/dev/null; then break fi - # Sleep for 2 seconds before checking again sleep 2 elapsed=$((elapsed + 2)) done @@ -21,4 +45,4 @@ do # Kill the mirroring process after 1 hour or if it stopped kill $mirroring_pid 2>/dev/null sleep 2 -done \ No newline at end of file +done From 8e876037bab73f7fd8523b8f52a5737e29bb61b4 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Fri, 14 Feb 2025 14:30:14 +0530 Subject: [PATCH 11/28] modify log file size --- run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run.sh b/run.sh index 192e9b7..dc0b436 100755 --- a/run.sh +++ b/run.sh @@ -1,7 +1,7 @@ #!/bin/sh LOG_FILE="/tmp/dump.log" -MAX_LOG_SIZE=${MAX_LOG_SIZE:-10480} # Default to 10 MB (10 * 1024 * 1024 bytes) +MAX_LOG_SIZE=${MAX_LOG_SIZE:-10485760} # Default to 10 MB (10 * 1024 * 1024 bytes) CHECK_INTERVAL=60 # Check interval in seconds # Function to rotate the log file From 762054f6106975cdf9257989c0d68e4d97bcbead Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Fri, 14 Feb 2025 16:03:31 +0530 Subject: [PATCH 12/28] modify ubuntu version --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 90cdad2..7128ecc 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -29,7 +29,7 @@ jobs: # This workflow contains a single job called "build" build: # The type of runner that the job will run on - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 # Steps represent a sequence of tasks that will be executed as part of the job steps: From 0c9def8199b8cdbaa68b060c5c6acd5502bc5763 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Tue, 1 Apr 2025 18:49:03 +0530 Subject: [PATCH 13/28] add dest IP --- main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/main.go b/main.go index c2f4861..370f83c 100644 --- a/main.go +++ b/main.go @@ -328,6 +328,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { "requestPayload": requestsContent[i], "responsePayload": responsesContent[i], "ip": bd.key.net.Src().String(), + "destIp": bd.key.net.Dst().String(), "time": fmt.Sprint(time.Now().Unix()), "statusCode": fmt.Sprint(resp.StatusCode), "type": string(req.Proto), From 569be3ca2f69b4b735bf3ffb86b215042e49ab3b Mon Sep 17 00:00:00 2001 From: gauravmann Date: Fri, 11 Apr 2025 15:47:05 +0530 Subject: [PATCH 14/28] Change header name to main.go --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 52b924e..1186c58 100644 --- a/main.go +++ b/main.go @@ -283,7 +283,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { for name, values := range req.Header { // Loop over all values for the name. for _, value := range values { - reqHeader[name] = &trafficpb.StringList{ + reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ Values: []string{value}, } } @@ -327,7 +327,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { for name, values := range resp.Header { // Loop over all values for the name. for _, value := range values { - respHeader[name] = &trafficpb.StringList{ + respHeader[strings.ToLower(name)] = &trafficpb.StringList{ Values: []string{value}, } } From 11cd5e8c67d8a5e36b7268f378941e928a9106c1 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Fri, 11 Apr 2025 16:40:02 +0530 Subject: [PATCH 15/28] Read ip from request header --- kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka.go b/kafka.go index 021fe1e..7c0419d 100644 --- a/kafka.go +++ b/kafka.go @@ -54,7 +54,7 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht func GetSourceIp(value *trafficpb.HttpResponseParam) string { for _, header := range CLIENT_IP_HEADERS { - if headerValues, exists := value.ResponseHeaders[header]; exists { + if headerValues, exists := value.RequestHeaders[header]; exists { for _, value := range headerValues.Values { parts := strings.Split(value, ",") for _, part := range parts { From 59c8e7f3183756e56618191b0de63b76449aa8f0 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Fri, 11 Apr 2025 17:23:28 +0530 Subject: [PATCH 16/28] Set correct value in source IP --- kafka.go | 21 ++++++++++----------- main.go | 3 ++- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/kafka.go b/kafka.go index 7c0419d..4c3ad90 100644 --- a/kafka.go +++ b/kafka.go @@ -30,17 +30,16 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht return err } - ip := GetSourceIp(value) - fmt.Println("found ip ", ip) - fmt.Println("found ip bytes ", []byte(ip)) - if ip == "" { + fmt.Println("found ip ", value.Ip) + fmt.Println("found ip bytes ", []byte(value.Ip)) + if value.Ip == "" { fmt.Print("ip is empty, avoiding kafka push") return nil } // Send serialized message to Kafka msg := kafka.Message{ Topic: "akto.api.logs2", - Key: []byte(ip), // what to do when ip is empty? + Key: []byte(value.Ip), // what to do when ip is empty? Value: protoBytes, } @@ -51,12 +50,12 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht return err } -func GetSourceIp(value *trafficpb.HttpResponseParam) string { +func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) string { for _, header := range CLIENT_IP_HEADERS { - if headerValues, exists := value.RequestHeaders[header]; exists { - for _, value := range headerValues.Values { - parts := strings.Split(value, ",") + if headerValues, exists := reqHeaders[header]; exists { + for _, headerValue := range headerValues.Values { + parts := strings.Split(headerValue, ",") for _, part := range parts { ip := strings.TrimSpace(part) if ip != "" { @@ -70,8 +69,8 @@ func GetSourceIp(value *trafficpb.HttpResponseParam) string { } // if no headers found - fmt.Println("ip not found in headers, returning value ", value.Ip) - return value.Ip + fmt.Println("ip not found in headers, returning packet value ", packetIp) + return packetIp } func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) error { diff --git a/main.go b/main.go index 1186c58..7e85828 100644 --- a/main.go +++ b/main.go @@ -288,6 +288,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { } } } + ip := GetSourceIp(reqHeader, bd.key.net.Src().String()) reqHeader["host"] = &trafficpb.StringList{ Values: []string{req.Host}, @@ -350,7 +351,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { ResponseHeaders: respHeader, RequestPayload: requestsContent[i], ResponsePayload: responsesContent[i], - Ip: bd.key.net.Src().String(), + Ip: ip, Time: int32(time.Now().Unix()), StatusCode: int32(resp.StatusCode), Type: string(req.Proto), From 3faddf914cc5505f6fcd8590bb4393f4dfad581b Mon Sep 17 00:00:00 2001 From: gauravmann Date: Wed, 16 Apr 2025 16:20:47 +0530 Subject: [PATCH 17/28] Add compression --- kafka.go | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka.go b/kafka.go index 4c3ad90..fd18339 100644 --- a/kafka.go +++ b/kafka.go @@ -99,6 +99,7 @@ func GetKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Dur ReadTimeout: batchTimeout, WriteTimeout: batchTimeout, Balancer: &kafka.Hash{}, + Compression: kafka.Zstd, } } From e5cada9bbaff4d5e8f49c9a0828f393df1fa1118 Mon Sep 17 00:00:00 2001 From: gauravakto Date: Wed, 16 Apr 2025 16:23:20 +0530 Subject: [PATCH 18/28] Update main.yml --- .github/workflows/main.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4d481d9..3a38ab0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -68,7 +68,7 @@ jobs: # Build a docker container and push it to DockerHub docker buildx create --use echo "Building and Pushing image to ECR..." - docker buildx build --platform linux/arm64/v8,linux/amd64,linux/arm/v7 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push echo "::set-output name=image::$ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG" - name: DockerHub login @@ -89,6 +89,6 @@ jobs: # Build a docker container and push it to DockerHub docker buildx create --use echo "Building and Pushing image to DockerHub..." - docker buildx build --platform linux/arm64/v8,linux/amd64,linux/arm/v7 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push echo "::set-output name=image::$ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG" From 88ad68fd6fabd19b4ba524a9b5527d87cc8ac2cf Mon Sep 17 00:00:00 2001 From: gauravakto Date: Tue, 22 Apr 2025 13:51:00 +0530 Subject: [PATCH 19/28] Update kafka.go --- kafka.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka.go b/kafka.go index fd18339..be759f1 100644 --- a/kafka.go +++ b/kafka.go @@ -29,9 +29,7 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht log.Println("Failed to serialize protobuf message: ", err) return err } - - fmt.Println("found ip ", value.Ip) - fmt.Println("found ip bytes ", []byte(value.Ip)) + if value.Ip == "" { fmt.Print("ip is empty, avoiding kafka push") return nil From a892f45905abe9a71ed8fd489b3fede6240c732a Mon Sep 17 00:00:00 2001 From: gauravmann Date: Wed, 23 Apr 2025 16:10:48 +0530 Subject: [PATCH 20/28] Remove commented file kafka_producer --- kafka_producer.go | 121 ---------------------------------------------- 1 file changed, 121 deletions(-) delete mode 100644 kafka_producer.go diff --git a/kafka_producer.go b/kafka_producer.go deleted file mode 100644 index 8d0dc01..0000000 --- a/kafka_producer.go +++ /dev/null @@ -1,121 +0,0 @@ -package main - -// -//import ( -// "context" -// trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload" -// "log" -// "time" -// -// "google.golang.org/protobuf/proto" -//) -// -//// Produce serializes protobuf messages and sends to Kafka -//func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.HttpResponseParam) error { -// -// // Serialize to protobuf -// protoBytes, err := proto.Marshal(value) -// if err != nil { -// log.Println("Failed to serialize protobuf message: ", err) -// return -// } -// -// // Send serialized message to Kafka -// msg := kafka.Message{ -// Key: []byte("testkey"), -// Value: protoBytes, -// } -// -// err = kafkaWriter.WriteMessages(ctx, msg) -// if err != nil { -// log.Println("ERROR while writing messages: ", err) -// } -// return err -//} -// -//// GetKafkaWriter initializes Kafka writer -//func GetKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Duration) *kafka.Writer { -// return &kafka.Writer{ -// Addr: kafka.TCP(kafkaURL), -// Topic: topic, -// BatchSize: batchSize, -// BatchTimeout: batchTimeout, -// } -//} -// -////func main() { -//// kafkaURL := "192.168.1.11:29092" -//// topic := "akto.api.logs" -//// writer := GetKafkaWriter(kafkaURL, topic, 10, 10*time.Second) -//// defer writer.Close() -//// -//// ctx := context.Background() -//// reqHeader := make(map[string]*trafficpb.StringList) -//// reqHeader["key"] = &trafficpb.StringList{ -//// Values: []string{"val1"}, -//// } -//// respHeader := make(map[string]*trafficpb.StringList) -//// reqHeader["key"] = &trafficpb.StringList{ -//// Values: []string{"val1"}, -//// } -//// payload := &trafficpb.HttpResponseParam{ -//// Method: "Method", -//// Path: "https://example.com", -//// RequestHeaders: reqHeader, -//// ResponseHeaders: respHeader, -//// RequestPayload: "payload1", -//// ResponsePayload: "payload2", -//// Ip: "172.31.4.22", -//// Time: int32(time.Now().Unix()), -//// StatusCode: 200, -//// Type: "type1", -//// Status: "ok", -//// AktoAccountId: fmt.Sprint(1000000), -//// AktoVxlanId: "1000000", -//// IsPending: false, -//// } -//// Produce(writer, ctx, payload) -////} -// -//// -////func main() { -//// kafkaURL := "192.168.1.11:29092" -//// topic := "akto.api.logs" -//// writer := GetKafkaWriter(kafkaURL, topic, 10, 10*time.Second) -//// defer writer.Close() -//// -//// ctx := context.Background() -//// reqHeader := make(map[string]*trafficpb.StringList) -//// reqHeader["key"] = &trafficpb.StringList{ -//// Values: []string{"val1"}, -//// } -//// respHeader := make(map[string]*trafficpb.StringList) -//// reqHeader["key"] = &trafficpb.StringList{ -//// Values: []string{"val1"}, -//// } -//// payload := &trafficpb.HttpResponseParam{ -//// Method: "Method", -//// Path: "https://example.com", -//// RequestHeaders: reqHeader, -//// ResponseHeaders: respHeader, -//// RequestPayload: "payload1", -//// ResponsePayload: "payload2", -//// Ip: "172.31.4.22", -//// Time: int32(time.Now().Unix()), -//// StatusCode: 422, -//// Type: "type1", -//// Status: "ok", -//// AktoAccountId: fmt.Sprint(1000000), -//// AktoVxlanId: "1000000", -//// IsPending: false, -//// } -//// Produce(writer, ctx, payload) -//// Produce(writer, ctx, payload) -//// Produce(writer, ctx, payload) -//// Produce(writer, ctx, payload) -//// //if handle, err := pcap.OpenLive("eth0", 33554392, true, pcap.BlockForever); err != nil { -//// // log.Fatal(err) -//// //} else { -//// // run(handle, -1) -//// //} -////} From 0d61d27f80d501305765f73758e876c417e56fc4 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Wed, 23 Apr 2025 16:54:59 +0530 Subject: [PATCH 21/28] Add log levels --- kafka.go | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/kafka.go b/kafka.go index be759f1..f002b81 100644 --- a/kafka.go +++ b/kafka.go @@ -3,11 +3,10 @@ package main import ( "context" "encoding/json" - "fmt" trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload" "github.com/segmentio/kafka-go" "google.golang.org/protobuf/proto" - "log" + "log/slog" "strings" "time" ) @@ -26,24 +25,25 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht // intialize the writer with the broker addresses, and the topic protoBytes, err := proto.Marshal(value) if err != nil { - log.Println("Failed to serialize protobuf message: ", err) + slog.Error("Failed to serialize protobuf message", "error=", err) return err } if value.Ip == "" { - fmt.Print("ip is empty, avoiding kafka push") + slog.Warn("ip is empty, avoiding kafka push") return nil } // Send serialized message to Kafka + topic := "akto.api.logs2" msg := kafka.Message{ - Topic: "akto.api.logs2", + Topic: topic, Key: []byte(value.Ip), // what to do when ip is empty? Value: protoBytes, } err = kafkaWriter.WriteMessages(ctx, msg) if err != nil { - log.Println("ERROR while writing messages: ", err) + slog.Error("Kafka write for runtime failed", "topic=", topic, "error=", err) } return err } @@ -57,8 +57,7 @@ func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) s for _, part := range parts { ip := strings.TrimSpace(part) if ip != "" { - fmt.Println("found value of ip in header ", header) - fmt.Println("returning ip ", ip) + slog.Debug("Ip found in", "the header=", header) return ip } } @@ -66,21 +65,21 @@ func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) s } } - // if no headers found - fmt.Println("ip not found in headers, returning packet value ", packetIp) + slog.Debug("No ip found in headers returning", "packetIp=", packetIp) return packetIp } func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) error { // intialize the writer with the broker addresses, and the topic + topic := "akto.api.logs" msg := kafka.Message{ - Topic: "akto.api.logs", + Topic: topic, Value: []byte(message), } err := kafkaWriter.WriteMessages(ctx, msg) if err != nil { - log.Println("ERROR while writing messages: ", err) + slog.Error("Kafka write for runtime failed", "topic=", topic, "error=", err) return err } @@ -117,7 +116,7 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential { defer func(r *kafka.Reader) { err := r.Close() if err != nil { - log.Printf("could not close reader: %v", err) + slog.Error("could not close kafka reader", "error=", err) } }(r) @@ -127,25 +126,25 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential { for { select { case <-ctx.Done(): - log.Println("Timeout reached, no message received.") + slog.Error("Timeout reached, no message received.") return msg // Return empty Credential if timeout occurs default: // Attempt to read a message from the Kafka topic m, err := r.ReadMessage(ctx) if err != nil { if err == context.DeadlineExceeded { - log.Println("Timeout reached, no message received.") + slog.Error("Timeout reached, no message received.") return msg } - log.Printf("could not read message: %v", err) + slog.Error("Kafka Read failed for", "topic=", topic, "error=", err) return msg // Return empty Credential on read error } - log.Println("Found message: " + string(m.Value)) + slog.Debug("Found message: " + string(m.Value)) err = json.Unmarshal(m.Value, &msg) if err != nil { - log.Printf("could not unmarshal message: %v", err) + slog.Error("could not unmarshal kafka message", "error=", err) return msg // Return empty Credential on unmarshal error } From b4702afe1dd1fb75c812736a226f08f5b1ce9b90 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Wed, 23 Apr 2025 17:22:20 +0530 Subject: [PATCH 22/28] Log formating improve --- kafka.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/kafka.go b/kafka.go index f002b81..1cc8b14 100644 --- a/kafka.go +++ b/kafka.go @@ -25,7 +25,7 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht // intialize the writer with the broker addresses, and the topic protoBytes, err := proto.Marshal(value) if err != nil { - slog.Error("Failed to serialize protobuf message", "error=", err) + slog.Error("Failed to serialize protobuf message", "error", err) return err } @@ -43,7 +43,7 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht err = kafkaWriter.WriteMessages(ctx, msg) if err != nil { - slog.Error("Kafka write for runtime failed", "topic=", topic, "error=", err) + slog.Error("Kafka write for runtime failed", "topic", topic, "error", err) } return err } @@ -57,7 +57,7 @@ func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) s for _, part := range parts { ip := strings.TrimSpace(part) if ip != "" { - slog.Debug("Ip found in", "the header=", header) + slog.Debug("Ip found in", "the header", header) return ip } } @@ -65,7 +65,7 @@ func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) s } } - slog.Debug("No ip found in headers returning", "packetIp=", packetIp) + slog.Debug("No ip found in headers returning", "packetIp", packetIp) return packetIp } @@ -79,7 +79,7 @@ func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) err := kafkaWriter.WriteMessages(ctx, msg) if err != nil { - slog.Error("Kafka write for runtime failed", "topic=", topic, "error=", err) + slog.Error("Kafka write for runtime failed", "topic", topic, "error", err) return err } @@ -116,7 +116,7 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential { defer func(r *kafka.Reader) { err := r.Close() if err != nil { - slog.Error("could not close kafka reader", "error=", err) + slog.Error("could not close kafka reader", "error", err) } }(r) @@ -136,7 +136,7 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential { slog.Error("Timeout reached, no message received.") return msg } - slog.Error("Kafka Read failed for", "topic=", topic, "error=", err) + slog.Error("Kafka Read failed for", "topic", topic, "error", err) return msg // Return empty Credential on read error } @@ -144,7 +144,7 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential { err = json.Unmarshal(m.Value, &msg) if err != nil { - slog.Error("could not unmarshal kafka message", "error=", err) + slog.Error("could not unmarshal kafka message", "error", err) return msg // Return empty Credential on unmarshal error } From 5b669eac9aca24a2ead716a24b2062186dbef4ad Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 23 Apr 2025 18:04:09 +0530 Subject: [PATCH 23/28] add tcpdump to image --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index 1ac66d8..2e13986 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,7 @@ FROM golang:1.22-alpine RUN apk add build-base RUN apk add libpcap-dev +RUN apk add tcpdump WORKDIR /app From ac2736e56630ddfb98a41d6922069c66d6781ccd Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 23 Apr 2025 18:15:21 +0530 Subject: [PATCH 24/28] add destination IP --- main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/main.go b/main.go index f3de8b4..79a7d89 100644 --- a/main.go +++ b/main.go @@ -388,6 +388,7 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { value["type"] = "HTTP/2" value["ip"] = bd.key.net.Src().String() + value["destIp"] = bd.key.net.Dst().String() value["akto_account_id"] = fmt.Sprint(1000000) value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID) value["time"] = fmt.Sprint(time.Now().Unix()) From c2321b35874300c5f6840f40e942304873add29c Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 23 Apr 2025 19:07:49 +0530 Subject: [PATCH 25/28] add detected log --- main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/main.go b/main.go index 79a7d89..db13f90 100644 --- a/main.go +++ b/main.go @@ -410,6 +410,7 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { func tryReadFromBD(bd *bidi, isPending bool) { if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" { bd.a.bytes = bd.a.bytes[24:] + log.Println("grpc request detected") tryParseAsHttp2Request(bd, isPending) return } From 89b3b5da366c93c4d17f45d195ccd7fa6c071f49 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Wed, 23 Apr 2025 22:36:23 +0530 Subject: [PATCH 26/28] Add flag for threat --- main.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 7e85828..518704a 100644 --- a/main.go +++ b/main.go @@ -60,6 +60,7 @@ var filterHeaderValueMap = make(map[string]string) var ignoreCloudMetadataCalls = false var ignoreIpTraffic = false +var threatEnabled = false var ( handle *pcap.Handle err error @@ -406,7 +407,9 @@ func tryReadFromBD(bd *bidi, isPending bool) { //printLog("req-resp.String() " + string(out)) // insert kafka record for runtime - go ProduceStr(kafkaWriter, ctx, string(out)) + if threatEnabled { + go ProduceStr(kafkaWriter, ctx, string(out)) + } // insert kafka record for threat client go Produce(kafkaWriter, ctx, payload) @@ -852,6 +855,14 @@ func main() { log.Println("ignoreIpTraffic: missing. defaulting to false") } + threatEnabledVar := os.Getenv("AKTO_THREAT_ENABLED") + if len(threatEnabledVar) > 0 { + threatEnabled = strings.ToLower(threatEnabledVar) == "true" + log.Println("threatEnabled: ", threatEnabled) + } else { + log.Println("threatEnabled: missing. defaulting to false") + } + ignoreCloudMetadataCallsVar := os.Getenv("AKTO_IGNORE_CLOUD_METADATA_CALLS") if len(ignoreCloudMetadataCallsVar) > 0 { ignoreCloudMetadataCalls = strings.ToLower(ignoreCloudMetadataCallsVar) == "true" From 875f4c960ab33d36786ac79fd2da707c34cd186f Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 24 Apr 2025 16:38:07 +0530 Subject: [PATCH 27/28] use produceStr for http2 --- main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index e466a98..e4408fb 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,7 @@ import ( "strings" "sync" "time" + "net" "github.com/google/uuid" @@ -399,7 +400,7 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { printCounter-- log.Println("req-resp.String()", string(out)) } - go Produce(kafkaWriter, ctx, string(out)) + go ProduceStr(kafkaWriter, ctx, string(out)) } } From a8dd85f7e2fc07aa8bc38404600f0ef480584806 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Fri, 25 Apr 2025 17:56:17 +0530 Subject: [PATCH 28/28] Fix threat disable flag --- kafka.go | 2 +- main.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/kafka.go b/kafka.go index 1cc8b14..2c29838 100644 --- a/kafka.go +++ b/kafka.go @@ -43,7 +43,7 @@ func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.Ht err = kafkaWriter.WriteMessages(ctx, msg) if err != nil { - slog.Error("Kafka write for runtime failed", "topic", topic, "error", err) + slog.Error("Kafka write for threat failed", "topic", topic, "error", err) } return err } diff --git a/main.go b/main.go index e4408fb..9b4cf5e 100644 --- a/main.go +++ b/main.go @@ -631,11 +631,12 @@ func tryReadFromBD(bd *bidi, isPending bool) { //printLog("req-resp.String() " + string(out)) // insert kafka record for runtime if threatEnabled { - go ProduceStr(kafkaWriter, ctx, string(out)) + // insert kafka record for threat client + go Produce(kafkaWriter, ctx, payload) } - // insert kafka record for threat client - go Produce(kafkaWriter, ctx, payload) + // Todo convert to protobuf + go ProduceStr(kafkaWriter, ctx, string(out)) i++ } }