From 00369dc59eeefd5898c47fa127ae7c7019aca8cd Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Fri, 7 Jun 2024 16:02:12 +0530 Subject: [PATCH 1/3] enabling http2 and grpc in mirroring for filter_headers --- go.mod | 2 +- go.sum | 3 - main.go | 216 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 217 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index deb02e5..a0b96cb 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/akto-api-security/mirroring-api-logging go 1.17 require ( - github.com/akto-api-security/gomiddleware v0.1.0 github.com/google/gopacket v1.1.19 github.com/segmentio/kafka-go v0.4.25 go.mongodb.org/mongo-driver v1.11.3 + golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 ) require ( diff --git a/go.sum b/go.sum index 3039a4c..b0b4c66 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/akto-api-security/gomiddleware v0.1.0 h1:7yf8j2yKVX1Ar5kBeIMjzBAuOBZj9BvTZJ8uEALmR8s= -github.com/akto-api-security/gomiddleware v0.1.0/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -31,7 +29,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/segmentio/kafka-go v0.4.23/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/segmentio/kafka-go v0.4.25 h1:QVx9yz12syKBFkxR+dVDDwTO0ItHgnjjhIdBfqizj+8= github.com/segmentio/kafka-go v0.4.25/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main.go b/main.go index 04cd15b..3e32e4d 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/base64" "encoding/json" "fmt" "io" @@ -39,6 +40,9 @@ import ( "net" "github.com/segmentio/kafka-go" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" ) var printCounter = 500 @@ -104,6 +108,16 @@ type myFactory struct { source string } +type http2ReqResp struct { + headersMap map[string]string + payload string + isInvalid bool +} + +func (k http2ReqResp) String() string { + return fmt.Sprintf("%v:%v", k.headersMap, k.payload) +} + // New handles creating a new tcpassembly.Stream. func (f *myFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream { // Create a new stream. @@ -186,7 +200,209 @@ func checkIfIp(host string) bool { return net.ParseIP(chunks[0]) != nil } +func tryParseAsHttp2Request(bd *bidi, isPending bool) { + + streamRequestMap := make(map[string][]http2ReqResp) + framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes)) + + headersMap := make(map[string]string) + payload := "" + + gotHeaders := make(map[string]bool) + gotPayload := make(map[string]bool) + decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + + frame, err := framer.ReadFrame() + + if err == io.EOF { + break + } + if err != nil { + continue + } + + streamId := fmt.Sprint(frame.Header().StreamID) + if len(streamId) == 0 { + continue + } + + if !gotHeaders[streamId] { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + gotHeaders[streamId] = true + if err != nil { + } + + case *http2.DataFrame: + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + } + } + + if gotHeaders[streamId] && gotPayload[streamId] { + if _, exists := streamRequestMap[streamId]; !exists { + streamRequestMap[streamId] = []http2ReqResp{} + } + streamRequestMap[streamId] = append(streamRequestMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotHeaders[streamId] = false + gotPayload[streamId] = false + } + } + + gotHeaders = make(map[string]bool) + gotPayload = make(map[string]bool) + gotGrpcHeaders := make(map[string]bool) + headersCount := make(map[string]int) + headersMap = make(map[string]string) + payload = "" + + streamResponseMap := make(map[string][]http2ReqResp) + framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes)) + headersMap = make(map[string]string) + decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + frame, err := framerResp.ReadFrame() + if err == io.EOF { + break + } + if err != nil { + continue + } + + streamId := fmt.Sprint(frame.Header().StreamID) + + if len(streamId) == 0 { + continue + } + if !(gotHeaders[streamId]) { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + if err != nil { + log.Printf("Error response decoding headers: %v", err) + } + if headersCount[streamId] == 0 { + if strings.Contains(headersMap["content-type"], "application/grpc") { + gotGrpcHeaders[streamId] = true + } + gotHeaders[streamId] = true + } + headersCount[streamId]++ + case *http2.DataFrame: + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + } + } + if gotHeaders[streamId] && gotPayload[streamId] { + + if gotGrpcHeaders[streamId] && headersCount[streamId] == 1 { + continue + } + + if _, exists := streamResponseMap[streamId]; !exists { + streamResponseMap[streamId] = []http2ReqResp{} + } + streamResponseMap[streamId] = append(streamResponseMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotPayload[streamId] = false + gotHeaders[streamId] = false + gotGrpcHeaders[streamId] = false + headersCount[streamId] = 0 + } + } + + for streamId, http2Req := range streamRequestMap { + http2Resp := streamResponseMap[streamId] + if len(http2Resp) != len(http2Req) { + continue + } + for req := range http2Req { + + http2Request := http2Req[req] + http2Response := http2Resp[req] + + value := make(map[string]string) + + if path, exists := http2Request.headersMap[":path"]; exists { + value["path"] = path + delete(http2Request.headersMap, ":path") + } + if method, exists := http2Request.headersMap[":method"]; exists { + value["method"] = method + delete(http2Request.headersMap, ":method") + } + if scheme, exists := http2Request.headersMap[":scheme"]; exists { + value["scheme"] = scheme + delete(http2Request.headersMap, ":scheme") + } + if status, exists := http2Response.headersMap[":status"]; exists { + value["statusCode"] = status + delete(http2Response.headersMap, ":status") + } + value["requestPayload"] = http2Request.payload + value["responsePayload"] = http2Request.payload + + if len(http2Request.headersMap) > 0 { + requestHeaders, _ := json.Marshal(http2Request.headersMap) + value["requestHeaders"] = string(requestHeaders) + } + if len(http2Response.headersMap) > 0 { + responseHeader, _ := json.Marshal(http2Response.headersMap) + value["responseHeader"] = string(responseHeader) + } + + value["ip"] = bd.key.net.Src().String() + value["akto_account_id"] = fmt.Sprint(1000000) + value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID) + value["time"] = fmt.Sprint(time.Now().Unix()) + value["is_pending"] = fmt.Sprint(isPending) + value["source"] = bd.source + out, _ := json.Marshal(value) + ctx := context.Background() + + if printCounter > 0 { + printCounter-- + log.Println("req-resp.String()", string(out)) + } + go Produce(kafkaWriter, ctx, string(out)) + } + + } +} + func tryReadFromBD(bd *bidi, isPending bool) { + if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" { + bd.a.bytes = bd.a.bytes[24:] + tryParseAsHttp2Request(bd, isPending) + return + } + reader := bufio.NewReader(bytes.NewReader(bd.a.bytes)) i := 0 requests := []http.Request{} From 134287865b5708e5d8346d0c774017c1875a02a2 Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Fri, 7 Jun 2024 23:59:46 +0530 Subject: [PATCH 2/3] bug fixes and handling the case when "path" is empty --- main.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 3e32e4d..7f90085 100644 --- a/main.go +++ b/main.go @@ -352,6 +352,8 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { if path, exists := http2Request.headersMap[":path"]; exists { value["path"] = path delete(http2Request.headersMap, ":path") + } else { + return } if method, exists := http2Request.headersMap[":method"]; exists { value["method"] = method @@ -363,10 +365,11 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { } if status, exists := http2Response.headersMap[":status"]; exists { value["statusCode"] = status + value["status"] = status delete(http2Response.headersMap, ":status") } value["requestPayload"] = http2Request.payload - value["responsePayload"] = http2Request.payload + value["responsePayload"] = http2Response.payload if len(http2Request.headersMap) > 0 { requestHeaders, _ := json.Marshal(http2Request.headersMap) @@ -374,9 +377,10 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { } if len(http2Response.headersMap) > 0 { responseHeader, _ := json.Marshal(http2Response.headersMap) - value["responseHeader"] = string(responseHeader) + value["responseHeaders"] = string(responseHeader) } + value["type"] = "HTTP/2" value["ip"] = bd.key.net.Src().String() value["akto_account_id"] = fmt.Sprint(1000000) value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID) From 36dcf8a49f975892cd9e381b905d11a53c18ec8e Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Sat, 8 Jun 2024 00:17:21 +0530 Subject: [PATCH 3/3] bug fixes and handling the case when "path" is empty --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 7f90085..cb54f9a 100644 --- a/main.go +++ b/main.go @@ -353,7 +353,7 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { value["path"] = path delete(http2Request.headersMap, ":path") } else { - return + continue } if method, exists := http2Request.headersMap[":method"]; exists { value["method"] = method