diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4d481d9..63d46fa 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -29,7 +29,7 @@ jobs: # This workflow contains a single job called "build" build: # The type of runner that the job will run on - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 # Steps represent a sequence of tasks that will be executed as part of the job steps: @@ -37,7 +37,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: '^1.16.1' # The Go version to download (if necessary) and use. + go-version: '^1.22' # The Go version to download (if necessary) and use. - name: install required packages run: sudo apt install libpcap-dev expect - run: go build -o ./mirroring-api-logging @@ -68,7 +68,7 @@ jobs: # Build a docker container and push it to DockerHub docker buildx create --use echo "Building and Pushing image to ECR..." - docker buildx build --platform linux/arm64/v8,linux/amd64,linux/arm/v7 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push echo "::set-output name=image::$ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG" - name: DockerHub login @@ -89,6 +89,6 @@ jobs: # Build a docker container and push it to DockerHub docker buildx create --use echo "Building and Pushing image to DockerHub..." - docker buildx build --platform linux/arm64/v8,linux/amd64,linux/arm/v7 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push echo "::set-output name=image::$ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG" diff --git a/Dockerfile b/Dockerfile index 0e456e2..50f5909 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,7 @@ FROM golang:1.22-alpine RUN apk add build-base RUN apk add libpcap-dev +RUN apk add tcpdump WORKDIR /app diff --git a/go.mod b/go.mod index 577e9a5..f55d4c8 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/google/uuid v1.6.0 github.com/segmentio/kafka-go v0.4.25 go.mongodb.org/mongo-driver v1.11.3 + golang.org/x/net v0.30.0 google.golang.org/protobuf v1.36.1 ) @@ -23,7 +24,6 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect golang.org/x/crypto v0.28.0 // indirect - golang.org/x/net v0.30.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.19.0 // indirect diff --git a/kafka.go b/kafka.go index e1b2a85..2c29838 100644 --- a/kafka.go +++ b/kafka.go @@ -6,42 +6,80 @@ import ( trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload" "github.com/segmentio/kafka-go" "google.golang.org/protobuf/proto" - "log" + "log/slog" + "strings" "time" ) +var CLIENT_IP_HEADERS = []string{ + "x-forwarded-for", + "x-real-ip", + "x-cluster-client-ip", + "true-client-ip", + "x-original-forwarded-for", + "x-client-ip", + "client-ip", +} + func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.HttpResponseParam) error { // intialize the writer with the broker addresses, and the topic protoBytes, err := proto.Marshal(value) if err != nil { - log.Println("Failed to serialize protobuf message: ", err) + slog.Error("Failed to serialize protobuf message", "error", err) return err } - + + if value.Ip == "" { + slog.Warn("ip is empty, avoiding kafka push") + return nil + } // Send serialized message to Kafka + topic := "akto.api.logs2" msg := kafka.Message{ - Topic: "akto.api.logs2", - Key: []byte("testkey"), + Topic: topic, + Key: []byte(value.Ip), // what to do when ip is empty? Value: protoBytes, } err = kafkaWriter.WriteMessages(ctx, msg) if err != nil { - log.Println("ERROR while writing messages: ", err) + slog.Error("Kafka write for threat failed", "topic", topic, "error", err) } return err } +func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) string { + + for _, header := range CLIENT_IP_HEADERS { + if headerValues, exists := reqHeaders[header]; exists { + for _, headerValue := range headerValues.Values { + parts := strings.Split(headerValue, ",") + for _, part := range parts { + ip := strings.TrimSpace(part) + if ip != "" { + slog.Debug("Ip found in", "the header", header) + return ip + } + } + } + } + } + + slog.Debug("No ip found in headers returning", "packetIp", packetIp) + return packetIp +} + func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) error { // intialize the writer with the broker addresses, and the topic + topic := "akto.api.logs" msg := kafka.Message{ - Topic: "akto.api.logs", + Topic: topic, Value: []byte(message), } err := kafkaWriter.WriteMessages(ctx, msg) if err != nil { - log.Println("ERROR while writing messages: ", err) + slog.Error("Kafka write for runtime failed", "topic", topic, "error", err) return err } @@ -57,6 +95,8 @@ func GetKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Dur MaxAttempts: 1, ReadTimeout: batchTimeout, WriteTimeout: batchTimeout, + Balancer: &kafka.Hash{}, + Compression: kafka.Zstd, } } @@ -76,7 +116,7 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential { defer func(r *kafka.Reader) { err := r.Close() if err != nil { - log.Printf("could not close reader: %v", err) + slog.Error("could not close kafka reader", "error", err) } }(r) @@ -86,25 +126,25 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential { for { select { case <-ctx.Done(): - log.Println("Timeout reached, no message received.") + slog.Error("Timeout reached, no message received.") return msg // Return empty Credential if timeout occurs default: // Attempt to read a message from the Kafka topic m, err := r.ReadMessage(ctx) if err != nil { if err == context.DeadlineExceeded { - log.Println("Timeout reached, no message received.") + slog.Error("Timeout reached, no message received.") return msg } - log.Printf("could not read message: %v", err) + slog.Error("Kafka Read failed for", "topic", topic, "error", err) return msg // Return empty Credential on read error } - log.Println("Found message: " + string(m.Value)) + slog.Debug("Found message: " + string(m.Value)) err = json.Unmarshal(m.Value, &msg) if err != nil { - log.Printf("could not unmarshal message: %v", err) + slog.Error("could not unmarshal kafka message", "error", err) return msg // Return empty Credential on unmarshal error } diff --git a/kafka_producer.go b/kafka_producer.go deleted file mode 100644 index 8d0dc01..0000000 --- a/kafka_producer.go +++ /dev/null @@ -1,121 +0,0 @@ -package main - -// -//import ( -// "context" -// trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload" -// "log" -// "time" -// -// "google.golang.org/protobuf/proto" -//) -// -//// Produce serializes protobuf messages and sends to Kafka -//func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.HttpResponseParam) error { -// -// // Serialize to protobuf -// protoBytes, err := proto.Marshal(value) -// if err != nil { -// log.Println("Failed to serialize protobuf message: ", err) -// return -// } -// -// // Send serialized message to Kafka -// msg := kafka.Message{ -// Key: []byte("testkey"), -// Value: protoBytes, -// } -// -// err = kafkaWriter.WriteMessages(ctx, msg) -// if err != nil { -// log.Println("ERROR while writing messages: ", err) -// } -// return err -//} -// -//// GetKafkaWriter initializes Kafka writer -//func GetKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Duration) *kafka.Writer { -// return &kafka.Writer{ -// Addr: kafka.TCP(kafkaURL), -// Topic: topic, -// BatchSize: batchSize, -// BatchTimeout: batchTimeout, -// } -//} -// -////func main() { -//// kafkaURL := "192.168.1.11:29092" -//// topic := "akto.api.logs" -//// writer := GetKafkaWriter(kafkaURL, topic, 10, 10*time.Second) -//// defer writer.Close() -//// -//// ctx := context.Background() -//// reqHeader := make(map[string]*trafficpb.StringList) -//// reqHeader["key"] = &trafficpb.StringList{ -//// Values: []string{"val1"}, -//// } -//// respHeader := make(map[string]*trafficpb.StringList) -//// reqHeader["key"] = &trafficpb.StringList{ -//// Values: []string{"val1"}, -//// } -//// payload := &trafficpb.HttpResponseParam{ -//// Method: "Method", -//// Path: "https://example.com", -//// RequestHeaders: reqHeader, -//// ResponseHeaders: respHeader, -//// RequestPayload: "payload1", -//// ResponsePayload: "payload2", -//// Ip: "172.31.4.22", -//// Time: int32(time.Now().Unix()), -//// StatusCode: 200, -//// Type: "type1", -//// Status: "ok", -//// AktoAccountId: fmt.Sprint(1000000), -//// AktoVxlanId: "1000000", -//// IsPending: false, -//// } -//// Produce(writer, ctx, payload) -////} -// -//// -////func main() { -//// kafkaURL := "192.168.1.11:29092" -//// topic := "akto.api.logs" -//// writer := GetKafkaWriter(kafkaURL, topic, 10, 10*time.Second) -//// defer writer.Close() -//// -//// ctx := context.Background() -//// reqHeader := make(map[string]*trafficpb.StringList) -//// reqHeader["key"] = &trafficpb.StringList{ -//// Values: []string{"val1"}, -//// } -//// respHeader := make(map[string]*trafficpb.StringList) -//// reqHeader["key"] = &trafficpb.StringList{ -//// Values: []string{"val1"}, -//// } -//// payload := &trafficpb.HttpResponseParam{ -//// Method: "Method", -//// Path: "https://example.com", -//// RequestHeaders: reqHeader, -//// ResponseHeaders: respHeader, -//// RequestPayload: "payload1", -//// ResponsePayload: "payload2", -//// Ip: "172.31.4.22", -//// Time: int32(time.Now().Unix()), -//// StatusCode: 422, -//// Type: "type1", -//// Status: "ok", -//// AktoAccountId: fmt.Sprint(1000000), -//// AktoVxlanId: "1000000", -//// IsPending: false, -//// } -//// Produce(writer, ctx, payload) -//// Produce(writer, ctx, payload) -//// Produce(writer, ctx, payload) -//// Produce(writer, ctx, payload) -//// //if handle, err := pcap.OpenLive("eth0", 33554392, true, pcap.BlockForever); err != nil { -//// // log.Fatal(err) -//// //} else { -//// // run(handle, -1) -//// //} -////} diff --git a/main.go b/main.go index 52b924e..9b4cf5e 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/base64" "encoding/json" "fmt" trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload" @@ -29,6 +30,7 @@ import ( "strings" "sync" "time" + "net" "github.com/google/uuid" @@ -40,7 +42,9 @@ import ( "github.com/google/gopacket/pcap" "github.com/google/gopacket/tcpassembly" "github.com/segmentio/kafka-go" - "net" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" ) var printCounter = 500 @@ -60,6 +64,7 @@ var filterHeaderValueMap = make(map[string]string) var ignoreCloudMetadataCalls = false var ignoreIpTraffic = false +var threatEnabled = false var ( handle *pcap.Handle err error @@ -108,6 +113,16 @@ type myFactory struct { source string } +type http2ReqResp struct { + headersMap map[string]string + payload string + isInvalid bool +} + +func (k http2ReqResp) String() string { + return fmt.Sprintf("%v:%v", k.headersMap, k.payload) +} + // New handles creating a new tcpassembly.Stream. func (f *myFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream { // Create a new stream. @@ -190,7 +205,215 @@ func checkIfIp(host string) bool { return net.ParseIP(chunks[0]) != nil } +func tryParseAsHttp2Request(bd *bidi, isPending bool) { + + streamRequestMap := make(map[string][]http2ReqResp) + framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes)) + + headersMap := make(map[string]string) + payload := "" + + gotHeaders := make(map[string]bool) + gotPayload := make(map[string]bool) + decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + + frame, err := framer.ReadFrame() + + if err == io.EOF { + break + } + if err != nil { + continue + } + + streamId := fmt.Sprint(frame.Header().StreamID) + if len(streamId) == 0 { + continue + } + + if !gotHeaders[streamId] { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + gotHeaders[streamId] = true + if err != nil { + } + + case *http2.DataFrame: + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + } + } + + if gotHeaders[streamId] && gotPayload[streamId] { + if _, exists := streamRequestMap[streamId]; !exists { + streamRequestMap[streamId] = []http2ReqResp{} + } + streamRequestMap[streamId] = append(streamRequestMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotHeaders[streamId] = false + gotPayload[streamId] = false + } + } + + gotHeaders = make(map[string]bool) + gotPayload = make(map[string]bool) + gotGrpcHeaders := make(map[string]bool) + headersCount := make(map[string]int) + headersMap = make(map[string]string) + payload = "" + + streamResponseMap := make(map[string][]http2ReqResp) + framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes)) + headersMap = make(map[string]string) + decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + frame, err := framerResp.ReadFrame() + if err == io.EOF { + break + } + if err != nil { + continue + } + + streamId := fmt.Sprint(frame.Header().StreamID) + + if len(streamId) == 0 { + continue + } + if !(gotHeaders[streamId]) { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + if err != nil { + log.Printf("Error response decoding headers: %v", err) + } + if headersCount[streamId] == 0 { + if strings.Contains(headersMap["content-type"], "application/grpc") { + gotGrpcHeaders[streamId] = true + } + gotHeaders[streamId] = true + } + headersCount[streamId]++ + case *http2.DataFrame: + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + } + } + if gotHeaders[streamId] && gotPayload[streamId] { + + if gotGrpcHeaders[streamId] && headersCount[streamId] == 1 { + continue + } + + if _, exists := streamResponseMap[streamId]; !exists { + streamResponseMap[streamId] = []http2ReqResp{} + } + streamResponseMap[streamId] = append(streamResponseMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotPayload[streamId] = false + gotHeaders[streamId] = false + gotGrpcHeaders[streamId] = false + headersCount[streamId] = 0 + } + } + + for streamId, http2Req := range streamRequestMap { + http2Resp := streamResponseMap[streamId] + if len(http2Resp) != len(http2Req) { + continue + } + for req := range http2Req { + + http2Request := http2Req[req] + http2Response := http2Resp[req] + + value := make(map[string]string) + + if path, exists := http2Request.headersMap[":path"]; exists { + value["path"] = path + delete(http2Request.headersMap, ":path") + } else { + continue + } + if method, exists := http2Request.headersMap[":method"]; exists { + value["method"] = method + delete(http2Request.headersMap, ":method") + } + if scheme, exists := http2Request.headersMap[":scheme"]; exists { + value["scheme"] = scheme + delete(http2Request.headersMap, ":scheme") + } + if status, exists := http2Response.headersMap[":status"]; exists { + value["statusCode"] = status + value["status"] = status + delete(http2Response.headersMap, ":status") + } + value["requestPayload"] = http2Request.payload + value["responsePayload"] = http2Response.payload + + if len(http2Request.headersMap) > 0 { + requestHeaders, _ := json.Marshal(http2Request.headersMap) + value["requestHeaders"] = string(requestHeaders) + } + if len(http2Response.headersMap) > 0 { + responseHeader, _ := json.Marshal(http2Response.headersMap) + value["responseHeaders"] = string(responseHeader) + } + + value["type"] = "HTTP/2" + value["ip"] = bd.key.net.Src().String() + value["destIp"] = bd.key.net.Dst().String() + value["akto_account_id"] = fmt.Sprint(1000000) + value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID) + value["time"] = fmt.Sprint(time.Now().Unix()) + value["is_pending"] = fmt.Sprint(isPending) + value["source"] = bd.source + out, _ := json.Marshal(value) + ctx := context.Background() + + if printCounter > 0 { + printCounter-- + log.Println("req-resp.String()", string(out)) + } + go ProduceStr(kafkaWriter, ctx, string(out)) + } + + } +} + func tryReadFromBD(bd *bidi, isPending bool) { + if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" { + bd.a.bytes = bd.a.bytes[24:] + log.Println("grpc request detected") + tryParseAsHttp2Request(bd, isPending) + return + } + reader := bufio.NewReader(bytes.NewReader(bd.a.bytes)) i := 0 requests := []http.Request{} @@ -283,11 +506,12 @@ func tryReadFromBD(bd *bidi, isPending bool) { for name, values := range req.Header { // Loop over all values for the name. for _, value := range values { - reqHeader[name] = &trafficpb.StringList{ + reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ Values: []string{value}, } } } + ip := GetSourceIp(reqHeader, bd.key.net.Src().String()) reqHeader["host"] = &trafficpb.StringList{ Values: []string{req.Host}, @@ -327,7 +551,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { for name, values := range resp.Header { // Loop over all values for the name. for _, value := range values { - respHeader[name] = &trafficpb.StringList{ + respHeader[strings.ToLower(name)] = &trafficpb.StringList{ Values: []string{value}, } } @@ -350,7 +574,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { ResponseHeaders: respHeader, RequestPayload: requestsContent[i], ResponsePayload: responsesContent[i], - Ip: bd.key.net.Src().String(), + Ip: ip, Time: int32(time.Now().Unix()), StatusCode: int32(resp.StatusCode), Type: string(req.Proto), @@ -374,6 +598,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { "requestPayload": requestsContent[i], "responsePayload": responsesContent[i], "ip": bd.key.net.Src().String(), + "destIp": bd.key.net.Dst().String(), "time": fmt.Sprint(time.Now().Unix()), "statusCode": fmt.Sprint(resp.StatusCode), "type": string(req.Proto), @@ -405,10 +630,13 @@ func tryReadFromBD(bd *bidi, isPending bool) { //printLog("req-resp.String() " + string(out)) // insert kafka record for runtime - go ProduceStr(kafkaWriter, ctx, string(out)) + if threatEnabled { + // insert kafka record for threat client + go Produce(kafkaWriter, ctx, payload) + } - // insert kafka record for threat client - go Produce(kafkaWriter, ctx, payload) + // Todo convert to protobuf + go ProduceStr(kafkaWriter, ctx, string(out)) i++ } } @@ -851,6 +1079,14 @@ func main() { log.Println("ignoreIpTraffic: missing. defaulting to false") } + threatEnabledVar := os.Getenv("AKTO_THREAT_ENABLED") + if len(threatEnabledVar) > 0 { + threatEnabled = strings.ToLower(threatEnabledVar) == "true" + log.Println("threatEnabled: ", threatEnabled) + } else { + log.Println("threatEnabled: missing. defaulting to false") + } + ignoreCloudMetadataCallsVar := os.Getenv("AKTO_IGNORE_CLOUD_METADATA_CALLS") if len(ignoreCloudMetadataCallsVar) > 0 { ignoreCloudMetadataCalls = strings.ToLower(ignoreCloudMetadataCallsVar) == "true" diff --git a/run.sh b/run.sh index fc36636..dc0b436 100755 --- a/run.sh +++ b/run.sh @@ -1,19 +1,43 @@ #!/bin/sh +LOG_FILE="/tmp/dump.log" +MAX_LOG_SIZE=${MAX_LOG_SIZE:-10485760} # Default to 10 MB (10 * 1024 * 1024 bytes) +CHECK_INTERVAL=60 # Check interval in seconds + +# Function to rotate the log file +rotate_log() { + if [ -f "$LOG_FILE" ] && [ -s "$LOG_FILE" ]; then + log_size=$(stat -c%s "$LOG_FILE") # Get the size of the log file + if [ "$log_size" -ge "$MAX_LOG_SIZE" ]; then + echo "" > "$LOG_FILE" + fi + fi +} + +# Log rotation monitoring (only if ENABLE_LOGS is false) +if [[ "${ENABLE_LOGS}" == "false" ]]; then + while true; do + rotate_log # Check and rotate logs if necessary + sleep "$CHECK_INTERVAL" + done & +fi + while : do # Start the mirroring module in the background - /mirroring-api-logging & + if [[ "${ENABLE_LOGS}" == "false" ]]; then + /mirroring-api-logging >> "$LOG_FILE" 2>&1 & + else + /mirroring-api-logging & + fi mirroring_pid=$! # Monitor the process for 1 hour elapsed=0 while [ $elapsed -lt 3600 ]; do - # Check if the mirroring process is still running if ! kill -0 $mirroring_pid 2>/dev/null; then break fi - # Sleep for 2 seconds before checking again sleep 2 elapsed=$((elapsed + 2)) done @@ -21,4 +45,4 @@ do # Kill the mirroring process after 1 hour or if it stopped kill $mirroring_pid 2>/dev/null sleep 2 -done \ No newline at end of file +done