Skip to content

Commit

Permalink
Merge pull request #63 from irisnet/vincent/non-deterministic-test
Browse files Browse the repository at this point in the history
R4R:  Fix non deterministic test failures
  • Loading branch information
Haifeng Xi authored May 17, 2019
2 parents 7a2b942 + 3753721 commit 5a1cb27
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 18 deletions.
6 changes: 4 additions & 2 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
t.Fatalf("Error starting proxy app connections: %v", err)
}
defer proxyApp.Stop()
if err := handshaker.Handshake(proxyApp); err != nil {
if err := handshaker.Handshake(proxyApp, &config.BaseConfig); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}

Expand Down Expand Up @@ -624,6 +624,8 @@ func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit {
return bs.commits[height-1]
}

func (bs *mockBlockStore) RetreatLastBlock() {}

//----------------------------------------

func TestInitChainUpdateValidators(t *testing.T) {
Expand All @@ -646,7 +648,7 @@ func TestInitChainUpdateValidators(t *testing.T) {
t.Fatalf("Error starting proxy app connections: %v", err)
}
defer proxyApp.Stop()
if err := handshaker.Handshake(proxyApp); err != nil {
if err := handshaker.Handshake(proxyApp, &config.BaseConfig); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}

Expand Down
9 changes: 9 additions & 0 deletions consensus/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,20 @@ func (wal *baseWAL) OnStart() error {
return err
}

// Stop the underlying autofile group.
// Use Wait() to ensure it's finished shutting down
// before cleaning up files.
func (wal *baseWAL) OnStop() {
wal.group.Stop()
wal.group.Close()
}

// Wait for the underlying autofile group to finish shutting down
// so it's safe to cleanup files.
func (wal *baseWAL) Wait() {
wal.group.Wait()
}

// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
Expand Down
7 changes: 6 additions & 1 deletion consensus/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ func TestWALTruncate(t *testing.T) {
wal.SetLogger(log.TestingLogger())
err = wal.Start()
require.NoError(t, err)
defer wal.Stop()
defer func() {
wal.Stop()
// wait for the wal to finish shutting down so we
// can safely remove the directory
wal.Wait()
}()

//60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file.
//at this time, RotateFile is called, truncate content exist in each file.
Expand Down
12 changes: 12 additions & 0 deletions libs/autofile/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ type Group struct {
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to

// close this when the processTicks routine is done.
// this ensures we can cleanup the dir after calling Stop
// and the routine won't be trying to access it anymore
doneProcessTicks chan struct{}

// TODO: When we start deleting files, we need to start tracking GroupReaders
// and their dependencies.
}
Expand All @@ -90,6 +95,7 @@ func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err err
groupCheckDuration: defaultGroupCheckDuration,
minIndex: 0,
maxIndex: 0,
doneProcessTicks: make(chan struct{}),
}

for _, option := range groupOptions {
Expand Down Expand Up @@ -140,6 +146,11 @@ func (g *Group) OnStop() {
g.Flush() // flush any uncommitted data
}

func (g *Group) Wait() {
// wait for processTicks routine to finish
<-g.doneProcessTicks
}

// Close closes the head file. The group must be stopped by this moment.
func (g *Group) Close() {
g.Flush() // flush any uncommitted data
Expand Down Expand Up @@ -211,6 +222,7 @@ func (g *Group) Flush() error {
}

func (g *Group) processTicks() {
defer close(g.doneProcessTicks)
for {
select {
case <-g.ticker.C:
Expand Down
8 changes: 5 additions & 3 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func TestSplitAndTrimEmpty(t *testing.T) {
}
}

func TestNodeDelayedStop(t *testing.T) {
config := cfg.ResetTestRoot("node_delayed_node_test")
func TestNodeDelayedStart(t *testing.T) {
config := cfg.ResetTestRoot("node_delayed_start_test")
now := tmtime.Now()

// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
n.GenesisDoc().GenesisTime = now.Add(5 * time.Second)
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
require.NoError(t, err)

n.Start()
Expand Down Expand Up @@ -169,6 +169,8 @@ func TestNodeSetPrivValIPC(t *testing.T) {
dialer,
)

privval.RemoteSignerConnDeadline(100 * time.Millisecond)(pvsc)

done := make(chan struct{})
go func() {
defer close(done)
Expand Down
2 changes: 1 addition & 1 deletion p2p/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
return err
}

ni, err := handshake(conn, 50*time.Millisecond, sw.nodeInfo)
ni, err := handshake(conn, time.Second, sw.nodeInfo)
if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)
Expand Down
24 changes: 13 additions & 11 deletions privval/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ type SocketVal struct {
// reset if the connection fails.
// failures are detected by a background
// ping routine.
// All messages are request/response, so we hold the mutex
// so only one request/response pair can happen at a time.
// Methods on the underlying net.Conn itself
// are already gorountine safe.
mtx sync.RWMutex
mtx sync.Mutex
signer *RemoteSignerClient
}

Expand All @@ -82,22 +84,22 @@ func NewSocketVal(

// GetPubKey implements PrivValidator.
func (sc *SocketVal) GetPubKey() crypto.PubKey {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
return sc.signer.GetPubKey()
}

// SignVote implements PrivValidator.
func (sc *SocketVal) SignVote(chainID string, vote *types.Vote) error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
return sc.signer.SignVote(chainID, vote)
}

// SignProposal implements PrivValidator.
func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
return sc.signer.SignProposal(chainID, proposal)
}

Expand All @@ -106,15 +108,15 @@ func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) erro

// Ping is used to check connection health.
func (sc *SocketVal) Ping() error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
return sc.signer.Ping()
}

// Close closes the underlying net.Conn.
func (sc *SocketVal) Close() {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
sc.mtx.Lock()
defer sc.mtx.Unlock()
if sc.signer != nil {
if err := sc.signer.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
Expand Down

0 comments on commit 5a1cb27

Please sign in to comment.