Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/network): implement streamManager to cleanup not recently used streams #1611

Merged
merged 46 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
45e059b
update pool.get to return ptr
noot May 27, 2021
5197340
add logs
noot May 27, 2021
8d96671
reduce max message size to 63kb, no more pointers in pool
noot May 27, 2021
42e2f44
remove bufio
noot May 27, 2021
161406e
no more allocations
noot May 27, 2021
aa538da
try to fix sync??
noot May 27, 2021
735c60f
update logs
noot May 27, 2021
bcc5e6f
pass buf[0] to readLEB128ToUint64
noot May 27, 2021
db5ebf2
log
noot May 27, 2021
35200ba
fix
noot May 27, 2021
78895dd
fix
noot May 27, 2021
ef61ee8
remove connectToPeers case in discoverAndAdvertise
noot May 27, 2021
60afa84
restore pool to use ptrs
noot May 27, 2021
8e70013
remove storing peersToTry
noot May 27, 2021
4af4f95
fix
noot May 27, 2021
fb467b1
attempt to readd connecting to min peers
noot May 28, 2021
56c1aaa
increase connectToPeersTimeout
noot May 28, 2021
aa58de6
split up discoverAndAdvertise funcs
noot May 28, 2021
e1563f3
don't find peers
noot May 28, 2021
c8b5544
only find peers if below min
noot May 28, 2021
8d54da5
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot May 28, 2021
0ef575d
lint
noot May 28, 2021
3f3055e
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot May 28, 2021
5565d90
add go.mod
noot May 29, 2021
a1babfa
add timeout to readStream
noot May 30, 2021
a9fab01
cleanup
noot May 30, 2021
80569f5
fix
noot May 30, 2021
f392695
fix
noot May 30, 2021
a6ef2ac
fix again
noot May 30, 2021
da6d0b4
always close sync stream after handling, change timeout
noot May 30, 2021
2162ebc
remove timeouts, add streamManager
noot May 30, 2021
d4085c1
start streamManager
noot May 30, 2021
85e2e5d
change streamManager maps to sync.Map
noot May 31, 2021
03480e3
add test case
noot May 31, 2021
d26bce1
lint
noot May 31, 2021
945f95b
cleanup
noot May 31, 2021
4942020
merge w development
noot May 31, 2021
ea5248c
test cleanup
noot May 31, 2021
bb94aec
use single map in streamManager
noot Jun 1, 2021
09d9aab
lint
noot Jun 1, 2021
0fe8ea4
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Jun 1, 2021
7027073
Merge branch 'development' into noot/read-stream-timeout
arijitAD Jun 1, 2021
bbdee57
address comments
noot Jun 1, 2021
39694fa
Merge branch 'noot/read-stream-timeout' of github.com:ChainSafe/gossa…
noot Jun 1, 2021
f061756
use ticker.C
noot Jun 1, 2021
c3b1a83
add ticker.Stop
noot Jun 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ type Service struct {
ctx context.Context
cancel context.CancelFunc

cfg *Config
host *host
mdns *mdns
gossip *gossip
syncQueue *syncQueue
bufPool *sizedBufferPool
cfg *Config
host *host
mdns *mdns
gossip *gossip
syncQueue *syncQueue
bufPool *sizedBufferPool
streamManager *streamManager

notificationsProtocols map[byte]*notificationsProtocol // map of sub-protocol msg ID to protocol info
notificationsMu sync.RWMutex
Expand Down Expand Up @@ -162,6 +163,7 @@ func NewService(cfg *Config) (*Service, error) {
telemetryInterval: cfg.telemetryInterval,
closeCh: make(chan interface{}),
bufPool: bufPool,
streamManager: newStreamManager(ctx),
}

network.syncQueue = newSyncQueue(network)
Expand Down Expand Up @@ -267,6 +269,7 @@ func (s *Service) Start() error {
go s.logPeerCount()
go s.publishNetworkTelemetry(s.closeCh)
go s.sentBlockIntervalTelemetry()
s.streamManager.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why this spawns it's own goroutine instead of calling it via go s.streamManager.start()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that there's a ctx created on NewService. Do we need both the closeCh and ctx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no particular reason, I like it like this since the main network service doesn't need to be concerned with what the streamManager does


return nil
}
Expand Down Expand Up @@ -529,6 +532,8 @@ func isInbound(stream libp2pnetwork.Stream) bool {
}

func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) {
s.streamManager.logNewStream(stream)

peer := stream.Conn().RemotePeer()
msgBytes := s.bufPool.get()
defer s.bufPool.put(&msgBytes)
Expand All @@ -543,6 +548,8 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder
return
}

s.streamManager.logMessageReceived(stream.ID())

// decode message based on message type
msg, err := decoder(msgBytes[:tot], peer, isInbound(stream))
if err != nil {
Expand Down
78 changes: 78 additions & 0 deletions dot/network/stream_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package network

import (
"context"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/network"
)

var cleanupStreamInterval = time.Minute

type streamData struct {
lastReceivedMessage time.Time
stream network.Stream
}

// streamManager tracks inbound streams and runs a cleanup goroutine every `cleanupStreamInterval` to close streams that
// we haven't received any data on for the last time period. this prevents keeping stale streams open and continuously trying to
// read from it, which takes up lots of CPU over time.
type streamManager struct {
ctx context.Context
streamData *sync.Map //map[string]streamData
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename streamData to streamMap since we already have streamData.

}

func newStreamManager(ctx context.Context) *streamManager {
return &streamManager{
ctx: ctx,
streamData: new(sync.Map),
}
}

func (sm *streamManager) start() {
go func() {
for {
select {
case <-sm.ctx.Done():
return
case <-time.After(cleanupStreamInterval):
sm.cleanupStreams()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for {
select {
case <-sm.ctx.Done():
return
case <-time.After(cleanupStreamInterval):
sm.cleanupStreams()
}
}
ticker := time.NewTicker(cleanupStreamInterval)
for {
select {
case <-sm.ctx.Done():
return
case <-ticker.C:
sm.cleanupStreams()
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very cool, can definitely use this in other places too

}()
}

func (sm *streamManager) cleanupStreams() {
sm.streamData.Range(func(id, data interface{}) bool {
sdata := data.(streamData)
lastReceived := sdata.lastReceivedMessage
stream := sdata.stream

if time.Since(lastReceived) > cleanupStreamInterval {
_ = stream.Close()
sm.streamData.Delete(id)
}

return true
})
}

func (sm *streamManager) logNewStream(stream network.Stream) {
data := streamData{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to pointer.
data := &streamData{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering why?

lastReceivedMessage: time.Now(), // prevents closing just opened streams, in case the cleanup goroutine runs at the same time stream is opened
stream: stream,
}
sm.streamData.Store(stream.ID(), data)
}

func (sm *streamManager) logMessageReceived(streamID string) {
data, has := sm.streamData.Load(streamID)
if !has {
return
}

sdata := data.(streamData)
sdata.lastReceivedMessage = time.Now()
sm.streamData.Store(streamID, sdata)
}
102 changes: 102 additions & 0 deletions dot/network/stream_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package network

import (
"context"
"fmt"
"testing"
"time"

"github.com/libp2p/go-libp2p"
libp2phost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"

"github.com/stretchr/testify/require"
)

func setupStreamManagerTest(t *testing.T) (context.Context, []libp2phost.Host, []*streamManager) {
ctx, cancel := context.WithCancel(context.Background())

cleanupStreamInterval = time.Millisecond * 500
t.Cleanup(func() {
cleanupStreamInterval = time.Minute
cancel()
})

smA := newStreamManager(ctx)
smB := newStreamManager(ctx)

portA := 7001
portB := 7002
addrA, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", portA))
require.NoError(t, err)
addrB, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", portB))
require.NoError(t, err)

ha, err := libp2p.New(
ctx, libp2p.ListenAddrs(addrA),
)
require.NoError(t, err)

hb, err := libp2p.New(
ctx, libp2p.ListenAddrs(addrB),
)
require.NoError(t, err)

err = ha.Connect(ctx, peer.AddrInfo{
ID: hb.ID(),
Addrs: hb.Addrs(),
})
require.NoError(t, err)

hb.SetStreamHandler("", func(stream network.Stream) {
smB.logNewStream(stream)
})

return ctx, []libp2phost.Host{ha, hb}, []*streamManager{smA, smB}
}

func TestStreamManager(t *testing.T) {
ctx, hosts, sms := setupStreamManagerTest(t)
ha, hb := hosts[0], hosts[1]
smA, smB := sms[0], sms[1]

stream, err := ha.NewStream(ctx, hb.ID(), "")
require.NoError(t, err)

smA.logNewStream(stream)
smA.start()
smB.start()

time.Sleep(cleanupStreamInterval * 2)
connsAToB := ha.Network().ConnsToPeer(hb.ID())
require.Equal(t, 1, len(connsAToB))
require.Equal(t, 0, len(connsAToB[0].GetStreams()))

connsBToA := hb.Network().ConnsToPeer(ha.ID())
require.Equal(t, 1, len(connsBToA))
require.Equal(t, 0, len(connsBToA[0].GetStreams()))
}

func TestStreamManager_KeepStream(t *testing.T) {
ctx, hosts, sms := setupStreamManagerTest(t)
ha, hb := hosts[0], hosts[1]
smA, smB := sms[0], sms[1]

stream, err := ha.NewStream(ctx, hb.ID(), "")
require.NoError(t, err)

smA.logNewStream(stream)
smA.start()
smB.start()

time.Sleep(cleanupStreamInterval / 2)
connsAToB := ha.Network().ConnsToPeer(hb.ID())
require.Equal(t, 1, len(connsAToB))
require.Equal(t, 1, len(connsAToB[0].GetStreams()))

connsBToA := hb.Network().ConnsToPeer(ha.ID())
require.Equal(t, 1, len(connsBToA))
require.Equal(t, 1, len(connsBToA[0].GetStreams()))
}
6 changes: 1 addition & 5 deletions dot/network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) {
)

length, err := readLEB128ToUint64(stream, buf[:1])
if err == io.EOF {
return 0, err
} else if err != nil {
if err != nil {
return 0, err // TODO: return bytes read from readLEB128ToUint64
}

Expand All @@ -196,13 +194,11 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) {

if length > uint64(len(buf)) {
logger.Warn("received message with size greater than allocated message buffer", "length", length, "buffer size", len(buf))
_ = stream.Close()
return 0, fmt.Errorf("message size greater than allocated message buffer: got %d", length)
}

if length > maxBlockResponseSize {
logger.Warn("received message with size greater than maxBlockResponseSize, closing stream", "length", length)
_ = stream.Close()
return 0, fmt.Errorf("message size greater than maximum: got %d", length)
}

Expand Down