Skip to content

Commit

Permalink
Make mempool reactor handles received msg asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
danliu committed Nov 15, 2018
1 parent 038d87b commit d5b5ea8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
1 change: 0 additions & 1 deletion mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ func TestReapPriority(t *testing.T) {
//go reapCheck()
wg.Add(TotalTx)
for i := 1; i <= TotalTx; i++ {
fmt.Printf("Insert checkTX:%v\n", i)
go checkTxs(i)
}
//close(seqReap)
Expand Down
33 changes: 30 additions & 3 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,31 @@ import (
const (
MempoolChannel = byte(0x30)

maxMsgSize = 1048576 // 1MB TODO make it configurable
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
maxMsgSize = 1048576 // 1MB TODO make it configurable
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
MempoolPacketChannelSize = 1024 * 8 // 8K messages can be queued
)

type MempoolPacket struct {
chID byte
src p2p.Peer
msgBytes []byte
}

// MempoolReactor handles mempool tx broadcasting amongst peers.
type MempoolReactor struct {
p2p.BaseReactor
config *cfg.MempoolConfig
Mempool *Mempool
recvCh chan *MempoolPacket
}

// NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
memR := &MempoolReactor{
config: config,
Mempool: mempool,
recvCh: make(chan *MempoolPacket, MempoolPacketChannelSize),
}
memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
return memR
Expand All @@ -50,9 +59,16 @@ func (memR *MempoolReactor) OnStart() error {
if !memR.config.Broadcast {
memR.Logger.Info("Tx broadcasting is disabled")
}
go memR.receiveRoutine()
return nil
}

// OnStop implements p2p.BaseReactor
// Close message queue channel
func (memR *MempoolReactor) OnStop() {
close(memR.recvCh)
}

// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
Expand All @@ -76,8 +92,19 @@ func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
}

// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
memR.recvCh <- &MempoolPacket{chID: chID, src: src, msgBytes: msgBytes}
}

func (memR *MempoolReactor) receiveRoutine() {
memR.Logger.Debug("Starting ReceiveRoutine for mempool")
for p := range memR.recvCh {
memR.receiveImpl(p.chID, p.src, p.msgBytes)
}
}

// It adds any received transactions to the mempool.
func (memR *MempoolReactor) receiveImpl(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
Expand Down
8 changes: 7 additions & 1 deletion p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,13 @@ FOR_LOOP:
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(pkt.ChannelID, msgBytes)
// except the mempool actually is using an asynchronus Receive() to prevent jamming requests
// stopping block producing (tested via )

// this copy is due to the underlying memory is shared, and causes problem for async call
msgCopy := make([]byte, len(msgBytes))
copy(msgCopy, msgBytes)
c.onReceive(pkt.ChannelID, msgCopy)
}
default:
err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
Expand Down

0 comments on commit d5b5ea8

Please sign in to comment.