From 3b8a00882d2df96ee4dfbcf861f5ea6bdb9e2cb8 Mon Sep 17 00:00:00 2001 From: Dan Bond Date: Sat, 28 Oct 2023 17:42:04 -0700 Subject: [PATCH 1/5] assemblers: switch to using map Signed-off-by: Dan Bond --- assemblers/http_matcher.go | 45 +++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/assemblers/http_matcher.go b/assemblers/http_matcher.go index 4f7dbbac..a797a050 100644 --- a/assemblers/http_matcher.go +++ b/assemblers/http_matcher.go @@ -7,7 +7,8 @@ import ( ) type httpMatcher struct { - entries *sync.Map + messages map[int64]*entry + mtx *sync.Mutex } type entry struct { @@ -21,7 +22,8 @@ type entry struct { func newRequestResponseMatcher() *httpMatcher { return &httpMatcher{ - entries: &sync.Map{}, + messages: make(map[int64]*entry), + mtx: &sync.Mutex{}, } } @@ -33,20 +35,23 @@ func newRequestResponseMatcher() *httpMatcher { // If the response hasn't been seen yet, // stores the Request for later lookup and returns match as nil and matchFound will be false. func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request *http.Request, packetCount int) (match *entry, matchFound bool) { + m.mtx.Lock() + defer m.mtx.Unlock() + + if e, ok := m.messages[key]; ok { + e.request = request + e.requestTimestamp = timestamp + delete(m.messages, key) + return e, true + } + e := &entry{ request: request, requestTimestamp: timestamp, requestPacketCount: packetCount, } - if v, matchFound := m.entries.LoadOrStore(key, e); matchFound { - m.entries.Delete(key) - e = v.(*entry) // reuse allocated &entry{} to hold the match - // found entry has Response, so update it with Request - e.request = request - e.requestTimestamp = timestamp - return e, true - } + m.messages[key] = e return nil, false } @@ -58,19 +63,23 @@ func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request // If the request hasn't been seen yet, // stores the Response for later lookup and returns match as nil and matchFound will be false. func (m *httpMatcher) GetOrStoreResponse(key int64, timestamp time.Time, response *http.Response, packetCount int) (match *entry, matchFound bool) { + m.mtx.Lock() + defer m.mtx.Unlock() + + if e, ok := m.messages[key]; ok { + e.response = response + e.responseTimestamp = timestamp + delete(m.messages, key) + return e, true + } + e := &entry{ response: response, responseTimestamp: timestamp, responsePacketCount: packetCount, } - if v, matchFound := m.entries.LoadOrStore(key, e); matchFound { - m.entries.Delete(key) - e = v.(*entry) // reuse allocated &entry{} to hold the match - // found entry has Request, so update it with Response - e.response = response - e.responseTimestamp = timestamp - return e, true - } + m.messages[key] = e return nil, false + } From fe2ebd3ef6f48fde5b3024e8cc871fc5dcb88bed Mon Sep 17 00:00:00 2001 From: Dan Bond Date: Sat, 28 Oct 2023 18:09:15 -0700 Subject: [PATCH 2/5] assemblers: use atomic values for stats Signed-off-by: Dan Bond --- assemblers/tcp_assembler.go | 54 ++++++++++++++++++++----------------- assemblers/tcp_stream.go | 11 ++++---- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/assemblers/tcp_assembler.go b/assemblers/tcp_assembler.go index d3b2a0ae..9b6fb986 100644 --- a/assemblers/tcp_assembler.go +++ b/assemblers/tcp_assembler.go @@ -12,35 +12,39 @@ import ( "github.com/gopacket/gopacket/layers" "github.com/gopacket/gopacket/pcap" "github.com/gopacket/gopacket/reassembly" - "github.com/honeycombio/honeycomb-network-agent/config" "github.com/honeycombio/libhoney-go" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + + "github.com/honeycombio/honeycomb-network-agent/config" ) var stats struct { - ipdefrag int - totalsz int - rejectFsm int - rejectOpt int - rejectConnFsm int - source_received int - source_dropped int - source_if_dropped int - total_streams uint64 - active_streams int64 + ipdefrag int + totalsz int + + // Below stats could be accessed concurrently, so explicitly + // mark them as atomic. + rejectFsm atomic.Uint64 + rejectOpt atomic.Uint64 + rejectConnFsm atomic.Uint64 + total_streams atomic.Uint64 + active_streams atomic.Uint64 + source_received atomic.Uint64 + source_dropped atomic.Uint64 + source_if_dropped atomic.Uint64 } func IncrementStreamCount() uint64 { - return atomic.AddUint64(&stats.total_streams, 1) + return stats.total_streams.Add(1) } func IncrementActiveStreamCount() { - atomic.AddInt64(&stats.active_streams, 1) + stats.active_streams.Add(1) } func DecrementActiveStreamCount() { - atomic.AddInt64(&stats.active_streams, -1) + stats.active_streams.Add(^uint64(0)) } type Context struct { @@ -198,17 +202,17 @@ func (a *tcpAssembler) logAssemblerStats() { statsFields := map[string]interface{}{ "uptime_ms": time.Since(a.startedAt).Milliseconds(), "IPdefrag": stats.ipdefrag, - "rejected_FSM": stats.rejectFsm, - "rejected_Options": stats.rejectOpt, + "rejected_FSM": stats.rejectFsm.Load(), + "rejected_Options": stats.rejectOpt.Load(), "total_TCP_bytes": stats.totalsz, - "conn_rejected_FSM": stats.rejectConnFsm, - "source_received": stats.source_received, - "source_dropped": stats.source_dropped, - "source_if_dropped": stats.source_if_dropped, + "conn_rejected_FSM": stats.rejectConnFsm.Load(), + "source_received": stats.source_received.Load(), + "source_dropped": stats.source_dropped.Load(), + "source_if_dropped": stats.source_if_dropped.Load(), "event_queue_length": len(a.eventsChan), "goroutines": runtime.NumGoroutine(), - "total_streams": stats.total_streams, - "active_streams": stats.active_streams, + "total_streams": stats.total_streams.Load(), + "active_streams": stats.active_streams.Load(), } statsEvent := libhoney.NewEvent() statsEvent.Dataset = a.config.StatsDataset @@ -261,8 +265,8 @@ func logPcapHandleStats(handle *pcap.Handle) { log.Error().Err(err).Msg("Failed to get pcap handle stats") continue } - stats.source_received += pcapStats.PacketsReceived - stats.source_dropped += pcapStats.PacketsDropped - stats.source_if_dropped += pcapStats.PacketsIfDropped + stats.source_received.Add(uint64(pcapStats.PacketsReceived)) + stats.source_dropped.Add(uint64(pcapStats.PacketsDropped)) + stats.source_if_dropped.Add(uint64(pcapStats.PacketsIfDropped)) } } diff --git a/assemblers/tcp_stream.go b/assemblers/tcp_stream.go index adc18492..626539d6 100644 --- a/assemblers/tcp_stream.go +++ b/assemblers/tcp_stream.go @@ -9,8 +9,9 @@ import ( "github.com/gopacket/gopacket" "github.com/gopacket/gopacket/layers" "github.com/gopacket/gopacket/reassembly" - "github.com/honeycombio/honeycomb-network-agent/config" "github.com/rs/zerolog/log" + + "github.com/honeycombio/honeycomb-network-agent/config" ) // tcpStream represents a TCP stream and receives TCP packets from the gopacket assembler @@ -61,10 +62,10 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re // FSM if !stream.tcpstate.CheckState(tcp, dir) { // Error("FSM", "%s: Packet rejected by FSM (state:%s)\n", t.ident, t.tcpstate.String()) - stats.rejectFsm++ + stats.rejectFsm.Add(1) if !stream.fsmerr { stream.fsmerr = true - stats.rejectConnFsm++ + stats.rejectConnFsm.Add(1) } if !stream.config.Ignorefsmerr { return false @@ -74,7 +75,7 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re err := stream.optchecker.Accept(tcp, ci, dir, nextSeq, start) if err != nil { // Error("OptionChecker", "%s: Packet rejected by OptionChecker: %s\n", t.ident, err) - stats.rejectOpt++ + stats.rejectOpt.Add(1) if !stream.config.Nooptcheck { return false } @@ -98,7 +99,7 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re } } if !accept { - stats.rejectOpt++ + stats.rejectOpt.Add(1) } return accept } From 2e039ad6b095b835a69c80b4ed61d08b4ed7551e Mon Sep 17 00:00:00 2001 From: Dan Bond Date: Sat, 28 Oct 2023 18:11:55 -0700 Subject: [PATCH 3/5] assemblers: remove unused tcp stream waitgroup Signed-off-by: Dan Bond --- assemblers/tcp_assembler.go | 1 - assemblers/tcp_stream_factory.go | 10 ++-------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/assemblers/tcp_assembler.go b/assemblers/tcp_assembler.go index 9b6fb986..8232eb88 100644 --- a/assemblers/tcp_assembler.go +++ b/assemblers/tcp_assembler.go @@ -190,7 +190,6 @@ func (h *tcpAssembler) Stop() { h.streamPool.Dump() } - h.streamFactory.WaitGoRoutines() h.logAssemblerStats() log.Debug(). Int("closed", closed). diff --git a/assemblers/tcp_stream_factory.go b/assemblers/tcp_stream_factory.go index 60ea05fb..d7803c58 100644 --- a/assemblers/tcp_stream_factory.go +++ b/assemblers/tcp_stream_factory.go @@ -1,18 +1,16 @@ package assemblers import ( - "sync" - "github.com/gopacket/gopacket" "github.com/gopacket/gopacket/layers" "github.com/gopacket/gopacket/reassembly" - "github.com/honeycombio/honeycomb-network-agent/config" "github.com/rs/zerolog/log" + + "github.com/honeycombio/honeycomb-network-agent/config" ) type tcpStreamFactory struct { config config.Config - wg sync.WaitGroup eventsChan chan Event } @@ -31,7 +29,3 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T IncrementActiveStreamCount() return NewTcpStream(net, transport, factory.config, factory.eventsChan) } - -func (factory *tcpStreamFactory) WaitGoRoutines() { - factory.wg.Wait() -} From 9a12281a97c436f21b983cce8d6b4ad5a47eda7d Mon Sep 17 00:00:00 2001 From: Dan Bond Date: Tue, 31 Oct 2023 08:10:41 -0700 Subject: [PATCH 4/5] assemblers: include responsePacketCount Signed-off-by: Dan Bond --- assemblers/http_matcher.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/assemblers/http_matcher.go b/assemblers/http_matcher.go index a797a050..617e9008 100644 --- a/assemblers/http_matcher.go +++ b/assemblers/http_matcher.go @@ -38,20 +38,20 @@ func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request m.mtx.Lock() defer m.mtx.Unlock() - if e, ok := m.messages[key]; ok { - e.request = request - e.requestTimestamp = timestamp + if match, matchFound = m.messages[key]; matchFound { + match.request = request + match.requestTimestamp = timestamp + match.requestPacketCount = packetCount delete(m.messages, key) - return e, true + return match, matchFound } - e := &entry{ + m.messages[key] = &entry{ request: request, requestTimestamp: timestamp, requestPacketCount: packetCount, } - m.messages[key] = e return nil, false } @@ -66,20 +66,19 @@ func (m *httpMatcher) GetOrStoreResponse(key int64, timestamp time.Time, respons m.mtx.Lock() defer m.mtx.Unlock() - if e, ok := m.messages[key]; ok { - e.response = response - e.responseTimestamp = timestamp + if match, matchFound = m.messages[key]; matchFound { + match.response = response + match.responseTimestamp = timestamp + match.requestPacketCount = packetCount delete(m.messages, key) - return e, true + return match, matchFound } - e := &entry{ + m.messages[key] = &entry{ response: response, responseTimestamp: timestamp, responsePacketCount: packetCount, } - m.messages[key] = e return nil, false - } From 7512ca4c1677bf85a23afef73eeb278226520c85 Mon Sep 17 00:00:00 2001 From: Dan Bond Date: Tue, 31 Oct 2023 13:24:02 -0700 Subject: [PATCH 5/5] Update assemblers/http_matcher.go Co-authored-by: Robb Kidd --- assemblers/http_matcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assemblers/http_matcher.go b/assemblers/http_matcher.go index 617e9008..31a04646 100644 --- a/assemblers/http_matcher.go +++ b/assemblers/http_matcher.go @@ -69,7 +69,7 @@ func (m *httpMatcher) GetOrStoreResponse(key int64, timestamp time.Time, respons if match, matchFound = m.messages[key]; matchFound { match.response = response match.responseTimestamp = timestamp - match.requestPacketCount = packetCount + match.responsePacketCount = packetCount delete(m.messages, key) return match, matchFound }