diff --git a/assemblers/http_matcher.go b/assemblers/http_matcher.go index 4f7dbbac..31a04646 100644 --- a/assemblers/http_matcher.go +++ b/assemblers/http_matcher.go @@ -7,7 +7,8 @@ import ( ) type httpMatcher struct { - entries *sync.Map + messages map[int64]*entry + mtx *sync.Mutex } type entry struct { @@ -21,7 +22,8 @@ type entry struct { func newRequestResponseMatcher() *httpMatcher { return &httpMatcher{ - entries: &sync.Map{}, + messages: make(map[int64]*entry), + mtx: &sync.Mutex{}, } } @@ -33,20 +35,23 @@ func newRequestResponseMatcher() *httpMatcher { // If the response hasn't been seen yet, // stores the Request for later lookup and returns match as nil and matchFound will be false. func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request *http.Request, packetCount int) (match *entry, matchFound bool) { - e := &entry{ + m.mtx.Lock() + defer m.mtx.Unlock() + + if match, matchFound = m.messages[key]; matchFound { + match.request = request + match.requestTimestamp = timestamp + match.requestPacketCount = packetCount + delete(m.messages, key) + return match, matchFound + } + + m.messages[key] = &entry{ request: request, requestTimestamp: timestamp, requestPacketCount: packetCount, } - if v, matchFound := m.entries.LoadOrStore(key, e); matchFound { - m.entries.Delete(key) - e = v.(*entry) // reuse allocated &entry{} to hold the match - // found entry has Response, so update it with Request - e.request = request - e.requestTimestamp = timestamp - return e, true - } return nil, false } @@ -58,19 +63,22 @@ func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request // If the request hasn't been seen yet, // stores the Response for later lookup and returns match as nil and matchFound will be false. func (m *httpMatcher) GetOrStoreResponse(key int64, timestamp time.Time, response *http.Response, packetCount int) (match *entry, matchFound bool) { - e := &entry{ + m.mtx.Lock() + defer m.mtx.Unlock() + + if match, matchFound = m.messages[key]; matchFound { + match.response = response + match.responseTimestamp = timestamp + match.responsePacketCount = packetCount + delete(m.messages, key) + return match, matchFound + } + + m.messages[key] = &entry{ response: response, responseTimestamp: timestamp, responsePacketCount: packetCount, } - if v, matchFound := m.entries.LoadOrStore(key, e); matchFound { - m.entries.Delete(key) - e = v.(*entry) // reuse allocated &entry{} to hold the match - // found entry has Request, so update it with Response - e.response = response - e.responseTimestamp = timestamp - return e, true - } return nil, false } diff --git a/assemblers/tcp_assembler.go b/assemblers/tcp_assembler.go index d3b2a0ae..8232eb88 100644 --- a/assemblers/tcp_assembler.go +++ b/assemblers/tcp_assembler.go @@ -12,35 +12,39 @@ import ( "github.com/gopacket/gopacket/layers" "github.com/gopacket/gopacket/pcap" "github.com/gopacket/gopacket/reassembly" - "github.com/honeycombio/honeycomb-network-agent/config" "github.com/honeycombio/libhoney-go" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + + "github.com/honeycombio/honeycomb-network-agent/config" ) var stats struct { - ipdefrag int - totalsz int - rejectFsm int - rejectOpt int - rejectConnFsm int - source_received int - source_dropped int - source_if_dropped int - total_streams uint64 - active_streams int64 + ipdefrag int + totalsz int + + // Below stats could be accessed concurrently, so explicitly + // mark them as atomic. + rejectFsm atomic.Uint64 + rejectOpt atomic.Uint64 + rejectConnFsm atomic.Uint64 + total_streams atomic.Uint64 + active_streams atomic.Uint64 + source_received atomic.Uint64 + source_dropped atomic.Uint64 + source_if_dropped atomic.Uint64 } func IncrementStreamCount() uint64 { - return atomic.AddUint64(&stats.total_streams, 1) + return stats.total_streams.Add(1) } func IncrementActiveStreamCount() { - atomic.AddInt64(&stats.active_streams, 1) + stats.active_streams.Add(1) } func DecrementActiveStreamCount() { - atomic.AddInt64(&stats.active_streams, -1) + stats.active_streams.Add(^uint64(0)) } type Context struct { @@ -186,7 +190,6 @@ func (h *tcpAssembler) Stop() { h.streamPool.Dump() } - h.streamFactory.WaitGoRoutines() h.logAssemblerStats() log.Debug(). Int("closed", closed). @@ -198,17 +201,17 @@ func (a *tcpAssembler) logAssemblerStats() { statsFields := map[string]interface{}{ "uptime_ms": time.Since(a.startedAt).Milliseconds(), "IPdefrag": stats.ipdefrag, - "rejected_FSM": stats.rejectFsm, - "rejected_Options": stats.rejectOpt, + "rejected_FSM": stats.rejectFsm.Load(), + "rejected_Options": stats.rejectOpt.Load(), "total_TCP_bytes": stats.totalsz, - "conn_rejected_FSM": stats.rejectConnFsm, - "source_received": stats.source_received, - "source_dropped": stats.source_dropped, - "source_if_dropped": stats.source_if_dropped, + "conn_rejected_FSM": stats.rejectConnFsm.Load(), + "source_received": stats.source_received.Load(), + "source_dropped": stats.source_dropped.Load(), + "source_if_dropped": stats.source_if_dropped.Load(), "event_queue_length": len(a.eventsChan), "goroutines": runtime.NumGoroutine(), - "total_streams": stats.total_streams, - "active_streams": stats.active_streams, + "total_streams": stats.total_streams.Load(), + "active_streams": stats.active_streams.Load(), } statsEvent := libhoney.NewEvent() statsEvent.Dataset = a.config.StatsDataset @@ -261,8 +264,8 @@ func logPcapHandleStats(handle *pcap.Handle) { log.Error().Err(err).Msg("Failed to get pcap handle stats") continue } - stats.source_received += pcapStats.PacketsReceived - stats.source_dropped += pcapStats.PacketsDropped - stats.source_if_dropped += pcapStats.PacketsIfDropped + stats.source_received.Add(uint64(pcapStats.PacketsReceived)) + stats.source_dropped.Add(uint64(pcapStats.PacketsDropped)) + stats.source_if_dropped.Add(uint64(pcapStats.PacketsIfDropped)) } } diff --git a/assemblers/tcp_stream.go b/assemblers/tcp_stream.go index adc18492..626539d6 100644 --- a/assemblers/tcp_stream.go +++ b/assemblers/tcp_stream.go @@ -9,8 +9,9 @@ import ( "github.com/gopacket/gopacket" "github.com/gopacket/gopacket/layers" "github.com/gopacket/gopacket/reassembly" - "github.com/honeycombio/honeycomb-network-agent/config" "github.com/rs/zerolog/log" + + "github.com/honeycombio/honeycomb-network-agent/config" ) // tcpStream represents a TCP stream and receives TCP packets from the gopacket assembler @@ -61,10 +62,10 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re // FSM if !stream.tcpstate.CheckState(tcp, dir) { // Error("FSM", "%s: Packet rejected by FSM (state:%s)\n", t.ident, t.tcpstate.String()) - stats.rejectFsm++ + stats.rejectFsm.Add(1) if !stream.fsmerr { stream.fsmerr = true - stats.rejectConnFsm++ + stats.rejectConnFsm.Add(1) } if !stream.config.Ignorefsmerr { return false @@ -74,7 +75,7 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re err := stream.optchecker.Accept(tcp, ci, dir, nextSeq, start) if err != nil { // Error("OptionChecker", "%s: Packet rejected by OptionChecker: %s\n", t.ident, err) - stats.rejectOpt++ + stats.rejectOpt.Add(1) if !stream.config.Nooptcheck { return false } @@ -98,7 +99,7 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re } } if !accept { - stats.rejectOpt++ + stats.rejectOpt.Add(1) } return accept } diff --git a/assemblers/tcp_stream_factory.go b/assemblers/tcp_stream_factory.go index 60ea05fb..d7803c58 100644 --- a/assemblers/tcp_stream_factory.go +++ b/assemblers/tcp_stream_factory.go @@ -1,18 +1,16 @@ package assemblers import ( - "sync" - "github.com/gopacket/gopacket" "github.com/gopacket/gopacket/layers" "github.com/gopacket/gopacket/reassembly" - "github.com/honeycombio/honeycomb-network-agent/config" "github.com/rs/zerolog/log" + + "github.com/honeycombio/honeycomb-network-agent/config" ) type tcpStreamFactory struct { config config.Config - wg sync.WaitGroup eventsChan chan Event } @@ -31,7 +29,3 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T IncrementActiveStreamCount() return NewTcpStream(net, transport, factory.config, factory.eventsChan) } - -func (factory *tcpStreamFactory) WaitGoRoutines() { - factory.wg.Wait() -}