Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ module github.com/akto-api-security/mirroring-api-logging
go 1.17

require (
github.com/akto-api-security/gomiddleware v0.1.0
github.com/akto-api-security/gomiddleware v0.1.3
github.com/google/gopacket v1.1.19
github.com/segmentio/kafka-go v0.4.25
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
)

require (
github.com/golang/snappy v0.0.1 // indirect
github.com/klauspost/compress v1.9.8 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect
golang.org/x/text v0.3.0 // indirect
)
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/akto-api-security/gomiddleware v0.1.0 h1:7yf8j2yKVX1Ar5kBeIMjzBAuOBZj9BvTZJ8uEALmR8s=
github.com/akto-api-security/gomiddleware v0.1.0/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8=
github.com/akto-api-security/gomiddleware v0.1.3 h1:rcsQK7uwu5Z56hTM4lQQZbqGSXXtb+1MznbljCfSpUA=
github.com/akto-api-security/gomiddleware v0.1.3/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
Expand All @@ -23,7 +23,6 @@ github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDm
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.23/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/segmentio/kafka-go v0.4.25 h1:QVx9yz12syKBFkxR+dVDDwTO0ItHgnjjhIdBfqizj+8=
github.com/segmentio/kafka-go v0.4.25/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
216 changes: 215 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand All @@ -23,6 +24,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/google/gopacket"
Expand All @@ -32,6 +34,9 @@ import (

"github.com/akto-api-security/gomiddleware"
"github.com/segmentio/kafka-go"

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)

var printCounter = 500
Expand Down Expand Up @@ -82,6 +87,16 @@ type myFactory struct {
vxlanID int
}

type http2ReqResp struct {
headersMap map[string]string
payload string
isInvalid bool
}

func (k http2ReqResp) String() string {
return fmt.Sprintf("%v:%v", k.headersMap, k.payload)
}

// New handles creating a new tcpassembly.Stream.
func (f *myFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream {
// Create a new stream.
Expand Down Expand Up @@ -149,7 +164,207 @@ func (s *myStream) ReassemblyComplete() {
s.bidi.maybeFinish()
}

func tryParseAsHttp2Request(bd *bidi, isPending bool) {

streamRequestMap := make(map[string][]http2ReqResp)
framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes))

headersMap := make(map[string]string)
payload := ""

gotHeaders := make(map[string]bool)
gotPayload := make(map[string]bool)
decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) {

if len(hf.Name) > 0 {
headersMap[hf.Name] = hf.Value
}
})

for {

frame, err := framer.ReadFrame()

if err == io.EOF {
break
}
if err != nil {
continue
}

streamId := fmt.Sprint(frame.Header().StreamID)
if len(streamId) == 0 {
continue
}

if !gotHeaders[streamId] {
headersMap = make(map[string]string)
}

switch f := frame.(type) {
case *http2.HeadersFrame:
_, err := decoder.Write(f.HeaderBlockFragment())
gotHeaders[streamId] = true
if err != nil {
}

case *http2.DataFrame:
if len(string(f.Data())) > 0 {
payload = base64.StdEncoding.EncodeToString(f.Data())
gotPayload[streamId] = true
}
}

if gotHeaders[streamId] && gotPayload[streamId] {
if _, exists := streamRequestMap[streamId]; !exists {
streamRequestMap[streamId] = []http2ReqResp{}
}
streamRequestMap[streamId] = append(streamRequestMap[streamId], http2ReqResp{
headersMap: headersMap,
payload: payload,
})
gotHeaders[streamId] = false
gotPayload[streamId] = false
}
}

gotHeaders = make(map[string]bool)
gotPayload = make(map[string]bool)
gotGrpcHeaders := make(map[string]bool)
headersCount := make(map[string]int)
headersMap = make(map[string]string)
payload = ""

streamResponseMap := make(map[string][]http2ReqResp)
framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes))
headersMap = make(map[string]string)
decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) {
if len(hf.Name) > 0 {
headersMap[hf.Name] = hf.Value
}
})

for {
frame, err := framerResp.ReadFrame()
if err == io.EOF {
break
}
if err != nil {
continue
}

streamId := fmt.Sprint(frame.Header().StreamID)

if len(streamId) == 0 {
continue
}
if !(gotHeaders[streamId]) {
headersMap = make(map[string]string)
}

switch f := frame.(type) {
case *http2.HeadersFrame:
_, err := decoder.Write(f.HeaderBlockFragment())
if err != nil {
log.Printf("Error response decoding headers: %v", err)
}
if headersCount[streamId] == 0 {
if strings.Contains(headersMap["content-type"], "application/grpc") {
gotGrpcHeaders[streamId] = true
}
gotHeaders[streamId] = true
}
headersCount[streamId]++
case *http2.DataFrame:
if len(string(f.Data())) > 0 {
payload = base64.StdEncoding.EncodeToString(f.Data())
gotPayload[streamId] = true
}
}
if gotHeaders[streamId] && gotPayload[streamId] {

if gotGrpcHeaders[streamId] && headersCount[streamId] == 1 {
continue
}

if _, exists := streamResponseMap[streamId]; !exists {
streamResponseMap[streamId] = []http2ReqResp{}
}
streamResponseMap[streamId] = append(streamResponseMap[streamId], http2ReqResp{
headersMap: headersMap,
payload: payload,
})
gotPayload[streamId] = false
gotHeaders[streamId] = false
gotGrpcHeaders[streamId] = false
headersCount[streamId] = 0
}
}

for streamId, http2Req := range streamRequestMap {
http2Resp := streamResponseMap[streamId]
if len(http2Resp) != len(http2Req) {
continue
}
for req := range http2Req {

http2Request := http2Req[req]
http2Response := http2Resp[req]

value := make(map[string]string)

if path, exists := http2Request.headersMap[":path"]; exists {
value["path"] = path
delete(http2Request.headersMap, ":path")
}
if method, exists := http2Request.headersMap[":method"]; exists {
value["method"] = method
delete(http2Request.headersMap, ":method")
}
if scheme, exists := http2Request.headersMap[":scheme"]; exists {
value["scheme"] = scheme
delete(http2Request.headersMap, ":scheme")
}
if status, exists := http2Response.headersMap[":status"]; exists {
value["statusCode"] = status
delete(http2Response.headersMap, ":status")
}
value["requestPayload"] = http2Request.payload
value["responsePayload"] = http2Request.payload

if len(http2Request.headersMap) > 0 {
requestHeaders, _ := json.Marshal(http2Request.headersMap)
value["requestHeaders"] = string(requestHeaders)
}
if len(http2Response.headersMap) > 0 {
responseHeader, _ := json.Marshal(http2Response.headersMap)
value["responseHeader"] = string(responseHeader)
}

value["ip"] = bd.key.net.Src().String()
value["akto_account_id"] = fmt.Sprint(1000000)
value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID)
value["time"] = fmt.Sprint(time.Now().Unix())
value["is_pending"] = fmt.Sprint(isPending)
out, _ := json.Marshal(value)

if printCounter > 0 {
printCounter--
log.Println("req-resp.String()", string(out))
}
// go gomiddleware.Produce(kafkaWriter, ctx, string(out))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this line commented out? @shivam-rawat-akto

}

}
}

func tryReadFromBD(bd *bidi, isPending bool) {
if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" {
bd.a.bytes = bd.a.bytes[24:]
tryParseAsHttp2Request(bd, isPending)
return
}

reader := bufio.NewReader(bytes.NewReader(bd.a.bytes))
i := 0
requests := []http.Request{}
Expand All @@ -172,7 +387,6 @@ func tryReadFromBD(bd *bidi, isPending bool) {

requests = append(requests, *req)
requestsContent = append(requestsContent, string(body))
// log.Println("req.URL.String()", i, req.URL.String(), string(body), len(bd.a.bytes))
i++
}

Expand Down