Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection tracking - hash mechanism #201

Merged
merged 18 commits into from
May 24, 2022
Prev Previous commit
Next Next commit
Extract hash instance
  • Loading branch information
ronensc committed May 12, 2022
commit 3e909c13f2d093a4b52f4583c6eacccfd99a4dc2
28 changes: 14 additions & 14 deletions pkg/pipeline/conntrack/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"bytes"
"encoding/gob"
"fmt"
"hash/fnv"
"hash"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
Expand All @@ -30,41 +30,41 @@ import (

// ComputeHash computes the hash of a flow log according to keyFields.
// Two flow logs will have the same hash if they belong to the same connection.
func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, error) {
func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields, hasher hash.Hash) ([]byte, error) {
type hashType []byte
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronensc If you externalize the hashType type you will be able to also return hashType and not []byte

fieldGroup2hash := make(map[string]hashType)

// Compute the hash of each field group
for _, fg := range keyFields.FieldGroups {
h, err := computeHashFields(flowLog, fg.Fields)
h, err := computeHashFields(flowLog, fg.Fields, hasher)
if err != nil {
return nil, fmt.Errorf("compute hash: %w", err)
}
fieldGroup2hash[fg.Name] = h
}

// Compute the total hash
hash := fnv.New32a()
hasher.Reset()
for _, fgName := range keyFields.Hash.FieldGroupRefs {
hash.Write(fieldGroup2hash[fgName])
hasher.Write(fieldGroup2hash[fgName])
}
if keyFields.Hash.FieldGroupARef != "" {
hashA := fieldGroup2hash[keyFields.Hash.FieldGroupARef]
hashB := fieldGroup2hash[keyFields.Hash.FieldGroupBRef]
// Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A.
if bytes.Compare(hashA, hashB) < 0 {
hash.Write(hashA)
hash.Write(hashB)
hasher.Write(hashA)
hasher.Write(hashB)
} else {
hash.Write(hashB)
hash.Write(hashA)
hasher.Write(hashB)
hasher.Write(hashA)
}
}
return hash.Sum([]byte{}), nil
return hasher.Sum([]byte{}), nil
}

func computeHashFields(flowLog config.GenericMap, fieldNames []string) ([]byte, error) {
h := fnv.New32a()
func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash) ([]byte, error) {
hasher.Reset()
for _, fn := range fieldNames {
f, ok := flowLog[fn]
if !ok {
Expand All @@ -75,9 +75,9 @@ func computeHashFields(flowLog config.GenericMap, fieldNames []string) ([]byte,
if err != nil {
return nil, err
}
h.Write(bytes)
hasher.Write(bytes)
}
return h.Sum([]byte{}), nil
return hasher.Sum([]byte{}), nil
}

func toBytes(data interface{}) ([]byte, error) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/pipeline/conntrack/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package conntrack

import (
"hash/fnv"
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -37,6 +38,8 @@ func NewFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol i
}
}

var hasher = fnv.New32a()

func TestComputeHash_Unidirectional(t *testing.T) {
keyFields := api.KeyFields{
FieldGroups: []api.FieldGroup{
Expand Down Expand Up @@ -110,8 +113,8 @@ func TestComputeHash_Unidirectional(t *testing.T) {
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
h1, err1 := ComputeHash(test.flowLog1, keyFields)
h2, err2 := ComputeHash(test.flowLog2, keyFields)
h1, err1 := ComputeHash(test.flowLog1, keyFields, hasher)
h2, err2 := ComputeHash(test.flowLog2, keyFields, hasher)
require.NoError(t, err1)
require.NoError(t, err2)
if test.sameHash {
Expand Down Expand Up @@ -198,8 +201,8 @@ func TestComputeHash_Bidirectional(t *testing.T) {
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
h1, err1 := ComputeHash(test.flowLog1, keyFields)
h2, err2 := ComputeHash(test.flowLog2, keyFields)
h1, err1 := ComputeHash(test.flowLog1, keyFields, hasher)
h2, err2 := ComputeHash(test.flowLog2, keyFields, hasher)
require.NoError(t, err1)
require.NoError(t, err2)
if test.sameHash {
Expand Down