Skip to content

use ip as partition key #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: modify_kafka_push_payload_test
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
00369dc
enabling http2 and grpc in mirroring for filter_headers
shivam-rawat-akto Jun 7, 2024
1342878
bug fixes and handling the case when "path" is empty
shivam-rawat-akto Jun 7, 2024
36dcf8a
bug fixes and handling the case when "path" is empty
shivam-rawat-akto Jun 7, 2024
702e7a4
upgrade go version to 1.22
notshivansh Jan 13, 2025
a02445d
Merge pull request #68 from akto-api-security/feature/upgrade_go
notshivansh Jan 13, 2025
e43434f
use ip as partition key
ayushaga14 Jan 14, 2025
5a9ce9b
avoid kafka push if ip is empty
ayushaga14 Jan 14, 2025
e9d614b
add logs
ayushaga14 Jan 15, 2025
ea62076
add logs
ayushaga14 Jan 15, 2025
e0aae2b
add hash key balancer in kafkawriter init
ayushaga14 Jan 15, 2025
2da42d5
modify run sh script
ayushaga14 Feb 14, 2025
8e87603
modify log file size
ayushaga14 Feb 14, 2025
762054f
modify ubuntu version
ayushaga14 Feb 14, 2025
1c64ee8
Merge pull request #71 from akto-api-security/add_enable_log_flag
ayushaga14 Feb 14, 2025
0c9def8
add dest IP
notshivansh Apr 1, 2025
569be3c
Change header name to main.go
gauravakto Apr 11, 2025
11cd5e8
Read ip from request header
gauravakto Apr 11, 2025
59c8e7f
Set correct value in source IP
gauravakto Apr 11, 2025
3faddf9
Add compression
gauravakto Apr 16, 2025
e5cada9
Update main.yml
gauravakto Apr 16, 2025
88ad68f
Update kafka.go
gauravakto Apr 22, 2025
06393cc
Merge pull request #77 from akto-api-security/feature/header-lower-case
notshivansh Apr 23, 2025
a892f45
Remove commented file kafka_producer
gauravakto Apr 23, 2025
0d61d27
Add log levels
gauravakto Apr 23, 2025
b4702af
Log formating improve
gauravakto Apr 23, 2025
d8d3b96
Merge branch 'feature/filter_header' into feature/http2_grpc_handling…
notshivansh Apr 23, 2025
e92ee84
Merge pull request #82 from akto-api-security/feature/http2_grpc_hand…
shivam-rawat-akto Apr 23, 2025
5b669ea
add tcpdump to image
notshivansh Apr 23, 2025
ac2736e
add destination IP
notshivansh Apr 23, 2025
c2321b3
add detected log
notshivansh Apr 23, 2025
89b3b5d
Add flag for threat
gauravakto Apr 23, 2025
3c6b70a
Merge pull request #47 from akto-api-security/feature/http2_grpc_hand…
notshivansh Apr 24, 2025
e8085ae
Merge branch 'feature/filter_header' into use_ip_partition_key
gauravakto Apr 24, 2025
875f4c9
use produceStr for http2
gauravakto Apr 24, 2025
a8dd85f
Fix threat disable flag
gauravakto Apr 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ jobs:
# This workflow contains a single job called "build"
build:
# The type of runner that the job will run on
runs-on: ubuntu-latest
runs-on: ubuntu-22.04

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.16.1' # The Go version to download (if necessary) and use.
go-version: '^1.22' # The Go version to download (if necessary) and use.
- name: install required packages
run: sudo apt install libpcap-dev expect
- run: go build -o ./mirroring-api-logging
Expand Down Expand Up @@ -68,7 +68,7 @@ jobs:
# Build a docker container and push it to DockerHub
docker buildx create --use
echo "Building and Pushing image to ECR..."
docker buildx build --platform linux/arm64/v8,linux/amd64,linux/arm/v7 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push
echo "::set-output name=image::$ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG"

- name: DockerHub login
Expand All @@ -89,6 +89,6 @@ jobs:
# Build a docker container and push it to DockerHub
docker buildx create --use
echo "Building and Pushing image to DockerHub..."
docker buildx build --platform linux/arm64/v8,linux/amd64,linux/arm/v7 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push
docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push
echo "::set-output name=image::$ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG"

1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
FROM golang:1.22-alpine
RUN apk add build-base
RUN apk add libpcap-dev
RUN apk add tcpdump

WORKDIR /app

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/google/uuid v1.6.0
github.com/segmentio/kafka-go v0.4.25
go.mongodb.org/mongo-driver v1.11.3
golang.org/x/net v0.30.0
google.golang.org/protobuf v1.36.1
)

Expand All @@ -23,7 +24,6 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down
68 changes: 54 additions & 14 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,80 @@ import (
trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload"
"github.com/segmentio/kafka-go"
"google.golang.org/protobuf/proto"
"log"
"log/slog"
"strings"
"time"
)

var CLIENT_IP_HEADERS = []string{
"x-forwarded-for",
"x-real-ip",
"x-cluster-client-ip",
"true-client-ip",
"x-original-forwarded-for",
"x-client-ip",
"client-ip",
}

func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.HttpResponseParam) error {
// intialize the writer with the broker addresses, and the topic
protoBytes, err := proto.Marshal(value)
if err != nil {
log.Println("Failed to serialize protobuf message: ", err)
slog.Error("Failed to serialize protobuf message", "error", err)
return err
}


if value.Ip == "" {
slog.Warn("ip is empty, avoiding kafka push")
return nil
}
// Send serialized message to Kafka
topic := "akto.api.logs2"
msg := kafka.Message{
Topic: "akto.api.logs2",
Key: []byte("testkey"),
Topic: topic,
Key: []byte(value.Ip), // what to do when ip is empty?
Value: protoBytes,
}

err = kafkaWriter.WriteMessages(ctx, msg)
if err != nil {
log.Println("ERROR while writing messages: ", err)
slog.Error("Kafka write for threat failed", "topic", topic, "error", err)
}
return err
}

func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) string {

for _, header := range CLIENT_IP_HEADERS {
if headerValues, exists := reqHeaders[header]; exists {
for _, headerValue := range headerValues.Values {
parts := strings.Split(headerValue, ",")
for _, part := range parts {
ip := strings.TrimSpace(part)
if ip != "" {
slog.Debug("Ip found in", "the header", header)
return ip
}
}
}
}
}

slog.Debug("No ip found in headers returning", "packetIp", packetIp)
return packetIp
}

func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) error {
// intialize the writer with the broker addresses, and the topic
topic := "akto.api.logs"
msg := kafka.Message{
Topic: "akto.api.logs",
Topic: topic,
Value: []byte(message),
}
err := kafkaWriter.WriteMessages(ctx, msg)

if err != nil {
log.Println("ERROR while writing messages: ", err)
slog.Error("Kafka write for runtime failed", "topic", topic, "error", err)
return err
}

Expand All @@ -57,6 +95,8 @@ func GetKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Dur
MaxAttempts: 1,
ReadTimeout: batchTimeout,
WriteTimeout: batchTimeout,
Balancer: &kafka.Hash{},
Compression: kafka.Zstd,
}
}

Expand All @@ -76,7 +116,7 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential {
defer func(r *kafka.Reader) {
err := r.Close()
if err != nil {
log.Printf("could not close reader: %v", err)
slog.Error("could not close kafka reader", "error", err)
}
}(r)

Expand All @@ -86,25 +126,25 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential {
for {
select {
case <-ctx.Done():
log.Println("Timeout reached, no message received.")
slog.Error("Timeout reached, no message received.")
return msg // Return empty Credential if timeout occurs
default:
// Attempt to read a message from the Kafka topic
m, err := r.ReadMessage(ctx)
if err != nil {
if err == context.DeadlineExceeded {
log.Println("Timeout reached, no message received.")
slog.Error("Timeout reached, no message received.")
return msg
}
log.Printf("could not read message: %v", err)
slog.Error("Kafka Read failed for", "topic", topic, "error", err)
return msg // Return empty Credential on read error
}

log.Println("Found message: " + string(m.Value))
slog.Debug("Found message: " + string(m.Value))

err = json.Unmarshal(m.Value, &msg)
if err != nil {
log.Printf("could not unmarshal message: %v", err)
slog.Error("could not unmarshal kafka message", "error", err)
return msg // Return empty Credential on unmarshal error
}

Expand Down
121 changes: 0 additions & 121 deletions kafka_producer.go

This file was deleted.

Loading
Loading