Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improved locking of assembler state #307

Merged
merged 7 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions assemblers/http_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
)

type httpMatcher struct {
entries *sync.Map
messages map[int64]*entry
mtx *sync.Mutex
}

type entry struct {
Expand All @@ -21,7 +22,8 @@ type entry struct {

func newRequestResponseMatcher() *httpMatcher {
return &httpMatcher{
entries: &sync.Map{},
messages: make(map[int64]*entry),
mtx: &sync.Mutex{},
}
}

Expand All @@ -33,20 +35,23 @@ func newRequestResponseMatcher() *httpMatcher {
// If the response hasn't been seen yet,
// stores the Request for later lookup and returns match as nil and matchFound will be false.
func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request *http.Request, packetCount int) (match *entry, matchFound bool) {
e := &entry{
m.mtx.Lock()
defer m.mtx.Unlock()

if match, matchFound = m.messages[key]; matchFound {
match.request = request
match.requestTimestamp = timestamp
match.requestPacketCount = packetCount
delete(m.messages, key)
return match, matchFound
}

m.messages[key] = &entry{
request: request,
requestTimestamp: timestamp,
requestPacketCount: packetCount,
}

if v, matchFound := m.entries.LoadOrStore(key, e); matchFound {
m.entries.Delete(key)
e = v.(*entry) // reuse allocated &entry{} to hold the match
// found entry has Response, so update it with Request
e.request = request
e.requestTimestamp = timestamp
return e, true
}
return nil, false
}

Expand All @@ -58,19 +63,22 @@ func (m *httpMatcher) GetOrStoreRequest(key int64, timestamp time.Time, request
// If the request hasn't been seen yet,
// stores the Response for later lookup and returns match as nil and matchFound will be false.
func (m *httpMatcher) GetOrStoreResponse(key int64, timestamp time.Time, response *http.Response, packetCount int) (match *entry, matchFound bool) {
e := &entry{
m.mtx.Lock()
defer m.mtx.Unlock()

if match, matchFound = m.messages[key]; matchFound {
match.response = response
match.responseTimestamp = timestamp
match.requestPacketCount = packetCount
delete(m.messages, key)
return match, matchFound
}

m.messages[key] = &entry{
response: response,
responseTimestamp: timestamp,
responsePacketCount: packetCount,
}

if v, matchFound := m.entries.LoadOrStore(key, e); matchFound {
m.entries.Delete(key)
e = v.(*entry) // reuse allocated &entry{} to hold the match
// found entry has Request, so update it with Response
e.response = response
e.responseTimestamp = timestamp
return e, true
}
return nil, false
}
55 changes: 29 additions & 26 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,39 @@ import (
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/pcap"
"github.com/gopacket/gopacket/reassembly"
"github.com/honeycombio/honeycomb-network-agent/config"
"github.com/honeycombio/libhoney-go"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/honeycombio/honeycomb-network-agent/config"
)

var stats struct {
ipdefrag int
totalsz int
rejectFsm int
rejectOpt int
rejectConnFsm int
source_received int
source_dropped int
source_if_dropped int
total_streams uint64
active_streams int64
ipdefrag int
totalsz int

// Below stats could be accessed concurrently, so explicitly
// mark them as atomic.
rejectFsm atomic.Uint64
rejectOpt atomic.Uint64
rejectConnFsm atomic.Uint64
total_streams atomic.Uint64
active_streams atomic.Uint64
source_received atomic.Uint64
source_dropped atomic.Uint64
source_if_dropped atomic.Uint64
}

func IncrementStreamCount() uint64 {
return atomic.AddUint64(&stats.total_streams, 1)
return stats.total_streams.Add(1)
}

func IncrementActiveStreamCount() {
atomic.AddInt64(&stats.active_streams, 1)
stats.active_streams.Add(1)
}

func DecrementActiveStreamCount() {
atomic.AddInt64(&stats.active_streams, -1)
stats.active_streams.Add(^uint64(0))
}

type Context struct {
Expand Down Expand Up @@ -186,7 +190,6 @@ func (h *tcpAssembler) Stop() {
h.streamPool.Dump()
}

h.streamFactory.WaitGoRoutines()
h.logAssemblerStats()
log.Debug().
Int("closed", closed).
Expand All @@ -198,17 +201,17 @@ func (a *tcpAssembler) logAssemblerStats() {
statsFields := map[string]interface{}{
"uptime_ms": time.Since(a.startedAt).Milliseconds(),
"IPdefrag": stats.ipdefrag,
"rejected_FSM": stats.rejectFsm,
"rejected_Options": stats.rejectOpt,
"rejected_FSM": stats.rejectFsm.Load(),
"rejected_Options": stats.rejectOpt.Load(),
"total_TCP_bytes": stats.totalsz,
"conn_rejected_FSM": stats.rejectConnFsm,
"source_received": stats.source_received,
"source_dropped": stats.source_dropped,
"source_if_dropped": stats.source_if_dropped,
"conn_rejected_FSM": stats.rejectConnFsm.Load(),
"source_received": stats.source_received.Load(),
"source_dropped": stats.source_dropped.Load(),
"source_if_dropped": stats.source_if_dropped.Load(),
"event_queue_length": len(a.eventsChan),
"goroutines": runtime.NumGoroutine(),
"total_streams": stats.total_streams,
"active_streams": stats.active_streams,
"total_streams": stats.total_streams.Load(),
"active_streams": stats.active_streams.Load(),
}
statsEvent := libhoney.NewEvent()
statsEvent.Dataset = a.config.StatsDataset
Expand Down Expand Up @@ -261,8 +264,8 @@ func logPcapHandleStats(handle *pcap.Handle) {
log.Error().Err(err).Msg("Failed to get pcap handle stats")
continue
}
stats.source_received += pcapStats.PacketsReceived
stats.source_dropped += pcapStats.PacketsDropped
stats.source_if_dropped += pcapStats.PacketsIfDropped
stats.source_received.Add(uint64(pcapStats.PacketsReceived))
stats.source_dropped.Add(uint64(pcapStats.PacketsDropped))
stats.source_if_dropped.Add(uint64(pcapStats.PacketsIfDropped))
}
}
11 changes: 6 additions & 5 deletions assemblers/tcp_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/reassembly"
"github.com/honeycombio/honeycomb-network-agent/config"
"github.com/rs/zerolog/log"

"github.com/honeycombio/honeycomb-network-agent/config"
)

// tcpStream represents a TCP stream and receives TCP packets from the gopacket assembler
Expand Down Expand Up @@ -61,10 +62,10 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re
// FSM
if !stream.tcpstate.CheckState(tcp, dir) {
// Error("FSM", "%s: Packet rejected by FSM (state:%s)\n", t.ident, t.tcpstate.String())
stats.rejectFsm++
stats.rejectFsm.Add(1)
if !stream.fsmerr {
stream.fsmerr = true
stats.rejectConnFsm++
stats.rejectConnFsm.Add(1)
}
if !stream.config.Ignorefsmerr {
return false
Expand All @@ -74,7 +75,7 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re
err := stream.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil {
// Error("OptionChecker", "%s: Packet rejected by OptionChecker: %s\n", t.ident, err)
stats.rejectOpt++
stats.rejectOpt.Add(1)
if !stream.config.Nooptcheck {
return false
}
Expand All @@ -98,7 +99,7 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re
}
}
if !accept {
stats.rejectOpt++
stats.rejectOpt.Add(1)
}
return accept
}
Expand Down
10 changes: 2 additions & 8 deletions assemblers/tcp_stream_factory.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package assemblers

import (
"sync"

"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/reassembly"
"github.com/honeycombio/honeycomb-network-agent/config"
"github.com/rs/zerolog/log"

"github.com/honeycombio/honeycomb-network-agent/config"
)

type tcpStreamFactory struct {
config config.Config
wg sync.WaitGroup
eventsChan chan Event
}

Expand All @@ -31,7 +29,3 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
IncrementActiveStreamCount()
return NewTcpStream(net, transport, factory.config, factory.eventsChan)
}

func (factory *tcpStreamFactory) WaitGoRoutines() {
factory.wg.Wait()
}