Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enabling http2 and grpc in mirroring for filter_headers #47

Open
wants to merge 3 commits into
base: feature/filter_header
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ module github.com/akto-api-security/mirroring-api-logging
go 1.17

require (
github.com/akto-api-security/gomiddleware v0.1.0
github.com/google/gopacket v1.1.19
github.com/segmentio/kafka-go v0.4.25
go.mongodb.org/mongo-driver v1.11.3
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
)

require (
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/akto-api-security/gomiddleware v0.1.0 h1:7yf8j2yKVX1Ar5kBeIMjzBAuOBZj9BvTZJ8uEALmR8s=
github.com/akto-api-security/gomiddleware v0.1.0/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -31,7 +29,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.23/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/segmentio/kafka-go v0.4.25 h1:QVx9yz12syKBFkxR+dVDDwTO0ItHgnjjhIdBfqizj+8=
github.com/segmentio/kafka-go v0.4.25/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
220 changes: 220 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand All @@ -39,6 +40,9 @@ import (
"net"

"github.com/segmentio/kafka-go"

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)

var printCounter = 500
Expand Down Expand Up @@ -104,6 +108,16 @@ type myFactory struct {
source string
}

type http2ReqResp struct {
headersMap map[string]string
payload string
isInvalid bool
}

func (k http2ReqResp) String() string {
return fmt.Sprintf("%v:%v", k.headersMap, k.payload)
}

// New handles creating a new tcpassembly.Stream.
func (f *myFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream {
// Create a new stream.
Expand Down Expand Up @@ -186,7 +200,213 @@ func checkIfIp(host string) bool {
return net.ParseIP(chunks[0]) != nil
}

func tryParseAsHttp2Request(bd *bidi, isPending bool) {

streamRequestMap := make(map[string][]http2ReqResp)
framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes))

headersMap := make(map[string]string)
payload := ""

gotHeaders := make(map[string]bool)
gotPayload := make(map[string]bool)
decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) {

if len(hf.Name) > 0 {
headersMap[hf.Name] = hf.Value
}
})

for {

frame, err := framer.ReadFrame()

if err == io.EOF {
break
}
if err != nil {
continue
}

streamId := fmt.Sprint(frame.Header().StreamID)
if len(streamId) == 0 {
continue
}

if !gotHeaders[streamId] {
headersMap = make(map[string]string)
}

switch f := frame.(type) {
case *http2.HeadersFrame:
_, err := decoder.Write(f.HeaderBlockFragment())
gotHeaders[streamId] = true
if err != nil {
}

case *http2.DataFrame:
if len(string(f.Data())) > 0 {
payload = base64.StdEncoding.EncodeToString(f.Data())
gotPayload[streamId] = true
}
}

if gotHeaders[streamId] && gotPayload[streamId] {
if _, exists := streamRequestMap[streamId]; !exists {
streamRequestMap[streamId] = []http2ReqResp{}
}
streamRequestMap[streamId] = append(streamRequestMap[streamId], http2ReqResp{
headersMap: headersMap,
payload: payload,
})
gotHeaders[streamId] = false
gotPayload[streamId] = false
}
}

gotHeaders = make(map[string]bool)
gotPayload = make(map[string]bool)
gotGrpcHeaders := make(map[string]bool)
headersCount := make(map[string]int)
headersMap = make(map[string]string)
payload = ""

streamResponseMap := make(map[string][]http2ReqResp)
framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes))
headersMap = make(map[string]string)
decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) {
if len(hf.Name) > 0 {
headersMap[hf.Name] = hf.Value
}
})

for {
frame, err := framerResp.ReadFrame()
if err == io.EOF {
break
}
if err != nil {
continue
}

streamId := fmt.Sprint(frame.Header().StreamID)

if len(streamId) == 0 {
continue
}
if !(gotHeaders[streamId]) {
headersMap = make(map[string]string)
}

switch f := frame.(type) {
case *http2.HeadersFrame:
_, err := decoder.Write(f.HeaderBlockFragment())
if err != nil {
log.Printf("Error response decoding headers: %v", err)
}
if headersCount[streamId] == 0 {
if strings.Contains(headersMap["content-type"], "application/grpc") {
gotGrpcHeaders[streamId] = true
}
gotHeaders[streamId] = true
}
headersCount[streamId]++
case *http2.DataFrame:
if len(string(f.Data())) > 0 {
payload = base64.StdEncoding.EncodeToString(f.Data())
gotPayload[streamId] = true
}
}
if gotHeaders[streamId] && gotPayload[streamId] {

if gotGrpcHeaders[streamId] && headersCount[streamId] == 1 {
continue
}

if _, exists := streamResponseMap[streamId]; !exists {
streamResponseMap[streamId] = []http2ReqResp{}
}
streamResponseMap[streamId] = append(streamResponseMap[streamId], http2ReqResp{
headersMap: headersMap,
payload: payload,
})
gotPayload[streamId] = false
gotHeaders[streamId] = false
gotGrpcHeaders[streamId] = false
headersCount[streamId] = 0
}
}

for streamId, http2Req := range streamRequestMap {
http2Resp := streamResponseMap[streamId]
if len(http2Resp) != len(http2Req) {
continue
}
for req := range http2Req {

http2Request := http2Req[req]
http2Response := http2Resp[req]

value := make(map[string]string)

if path, exists := http2Request.headersMap[":path"]; exists {
value["path"] = path
delete(http2Request.headersMap, ":path")
} else {
continue
}
if method, exists := http2Request.headersMap[":method"]; exists {
value["method"] = method
delete(http2Request.headersMap, ":method")
}
if scheme, exists := http2Request.headersMap[":scheme"]; exists {
value["scheme"] = scheme
delete(http2Request.headersMap, ":scheme")
}
if status, exists := http2Response.headersMap[":status"]; exists {
value["statusCode"] = status
value["status"] = status
delete(http2Response.headersMap, ":status")
}
value["requestPayload"] = http2Request.payload
value["responsePayload"] = http2Response.payload

if len(http2Request.headersMap) > 0 {
requestHeaders, _ := json.Marshal(http2Request.headersMap)
value["requestHeaders"] = string(requestHeaders)
}
if len(http2Response.headersMap) > 0 {
responseHeader, _ := json.Marshal(http2Response.headersMap)
value["responseHeaders"] = string(responseHeader)
}

value["type"] = "HTTP/2"
value["ip"] = bd.key.net.Src().String()
value["akto_account_id"] = fmt.Sprint(1000000)
value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID)
value["time"] = fmt.Sprint(time.Now().Unix())
value["is_pending"] = fmt.Sprint(isPending)
value["source"] = bd.source
out, _ := json.Marshal(value)
ctx := context.Background()

if printCounter > 0 {
printCounter--
log.Println("req-resp.String()", string(out))
}
go Produce(kafkaWriter, ctx, string(out))
}

}
}

func tryReadFromBD(bd *bidi, isPending bool) {
if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" {
bd.a.bytes = bd.a.bytes[24:]
tryParseAsHttp2Request(bd, isPending)
return
}

reader := bufio.NewReader(bytes.NewReader(bd.a.bytes))
i := 0
requests := []http.Request{}
Expand Down
Loading