Skip to content

Commit

Permalink
Merge pull request #61 from irisnet/vincent/remote-signer
Browse files Browse the repository at this point in the history
R4R: Upgrade privval module to improving remote sign
  • Loading branch information
Haifeng Xi authored May 17, 2019
2 parents c101ef8 + 9f90b29 commit a883219
Show file tree
Hide file tree
Showing 20 changed files with 1,225 additions and 1,272 deletions.
22 changes: 15 additions & 7 deletions cmd/priv_val_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"os"
"time"

"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
Expand Down Expand Up @@ -32,13 +33,20 @@ func main() {

pv := privval.LoadFilePV(*privValPath)

rs := privval.NewRemoteSigner(
logger,
*chainID,
*addr,
pv,
ed25519.GenPrivKey(),
)
var dialer privval.Dialer
protocol, address := cmn.ProtocolAndAddress(*addr)
switch protocol {
case "unix":
dialer = privval.DialUnixFn(address)
case "tcp":
connTimeout := 3 * time.Second // TODO
dialer = privval.DialTCPFn(address, connTimeout, ed25519.GenPrivKey())
default:
logger.Error("Unknown protocol", "protocol", protocol)
os.Exit(1)
}

rs := privval.NewRemoteSigner(logger, *chainID, pv, dialer)
err := rs.Start()
if err != nil {
panic(err)
Expand Down
17 changes: 10 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,27 +863,30 @@ func createAndStartPrivValidatorSocketClient(
listenAddr string,
logger log.Logger,
) (types.PrivValidator, error) {
var pvsc types.PrivValidator
var listener net.Listener

protocol, address := cmn.ProtocolAndAddress(listenAddr)
ln, err := net.Listen(protocol, address)
if err != nil {
return nil, err
}
switch protocol {
case "unix":
pvsc = privval.NewIPCVal(logger.With("module", "privval"), address)
listener = privval.NewUnixListener(ln)
case "tcp":
// TODO: persist this key so external signer
// can actually authenticate us
pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey())
listener = privval.NewTCPListener(ln, ed25519.GenPrivKey())
default:
return nil, fmt.Errorf(
"Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
protocol,
)
}

if pvsc, ok := pvsc.(cmn.Service); ok {
if err := pvsc.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start")
}
pvsc := privval.NewSocketVal(logger.With("module", "privval"), listener)
if err := pvsc.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start private validator")
}

return pvsc, nil
Expand Down
26 changes: 13 additions & 13 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,25 @@ func TestNodeSetPrivValTCP(t *testing.T) {
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = addr

rs := privval.NewRemoteSigner(
dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey())
pvsc := privval.NewRemoteSigner(
log.TestingLogger(),
config.ChainID(),
addr,
types.NewMockPV(),
ed25519.GenPrivKey(),
dialer,
)
privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs)

go func() {
err := rs.Start()
err := pvsc.Start()
if err != nil {
panic(err)
}
}()
defer rs.Stop()
defer pvsc.Stop()

n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.IsType(t, &privval.TCPVal{}, n.PrivValidator())
assert.IsType(t, &privval.SocketVal{}, n.PrivValidator())
}

// address without a protocol must result in error
Expand All @@ -161,25 +161,25 @@ func TestNodeSetPrivValIPC(t *testing.T) {
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile

rs := privval.NewIPCRemoteSigner(
dialer := privval.DialUnixFn(tmpfile)
pvsc := privval.NewRemoteSigner(
log.TestingLogger(),
config.ChainID(),
tmpfile,
types.NewMockPV(),
dialer,
)
privval.IPCRemoteSignerConnDeadline(3 * time.Second)(rs)

done := make(chan struct{})
go func() {
defer close(done)
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.IsType(t, &privval.IPCVal{}, n.PrivValidator())
assert.IsType(t, &privval.SocketVal{}, n.PrivValidator())
}()

err := rs.Start()
err := pvsc.Start()
require.NoError(t, err)
defer rs.Stop()
defer pvsc.Stop()

<-done
}
Expand Down
238 changes: 238 additions & 0 deletions privval/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package privval

import (
"errors"
"fmt"
"net"
"sync"
"time"

"github.com/tendermint/tendermint/crypto"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)

const (
defaultConnHeartBeatSeconds = 2
defaultDialRetries = 10
)

// Socket errors.
var (
ErrUnexpectedResponse = errors.New("received unexpected response")
)

var (
connHeartbeat = time.Second * defaultConnHeartBeatSeconds
)

// SocketValOption sets an optional parameter on the SocketVal.
type SocketValOption func(*SocketVal)

// SocketValHeartbeat sets the period on which to check the liveness of the
// connected Signer connections.
func SocketValHeartbeat(period time.Duration) SocketValOption {
return func(sc *SocketVal) { sc.connHeartbeat = period }
}

// SocketVal implements PrivValidator.
// It listens for an external process to dial in and uses
// the socket to request signatures.
type SocketVal struct {
cmn.BaseService

listener net.Listener

// ping
cancelPing chan struct{}
pingTicker *time.Ticker
connHeartbeat time.Duration

// signer is mutable since it can be
// reset if the connection fails.
// failures are detected by a background
// ping routine.
// Methods on the underlying net.Conn itself
// are already gorountine safe.
mtx sync.RWMutex
signer *RemoteSignerClient
}

// Check that SocketVal implements PrivValidator.
var _ types.PrivValidator = (*SocketVal)(nil)

// NewSocketVal returns an instance of SocketVal.
func NewSocketVal(
logger log.Logger,
listener net.Listener,
) *SocketVal {
sc := &SocketVal{
listener: listener,
connHeartbeat: connHeartbeat,
}

sc.BaseService = *cmn.NewBaseService(logger, "SocketVal", sc)

return sc
}

//--------------------------------------------------------
// Implement PrivValidator

// GetPubKey implements PrivValidator.
func (sc *SocketVal) GetPubKey() crypto.PubKey {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return sc.signer.GetPubKey()
}

// SignVote implements PrivValidator.
func (sc *SocketVal) SignVote(chainID string, vote *types.Vote) error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return sc.signer.SignVote(chainID, vote)
}

// SignProposal implements PrivValidator.
func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return sc.signer.SignProposal(chainID, proposal)
}

//--------------------------------------------------------
// More thread safe methods proxied to the signer

// Ping is used to check connection health.
func (sc *SocketVal) Ping() error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return sc.signer.Ping()
}

// Close closes the underlying net.Conn.
func (sc *SocketVal) Close() {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
if sc.signer != nil {
if err := sc.signer.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
}
}

if sc.listener != nil {
if err := sc.listener.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
}
}
}

//--------------------------------------------------------
// Service start and stop

// OnStart implements cmn.Service.
func (sc *SocketVal) OnStart() error {
if closed, err := sc.reset(); err != nil {
sc.Logger.Error("OnStart", "err", err)
return err
} else if closed {
return fmt.Errorf("listener is closed")
}

// Start a routine to keep the connection alive
sc.cancelPing = make(chan struct{}, 1)
sc.pingTicker = time.NewTicker(sc.connHeartbeat)
go func() {
for {
select {
case <-sc.pingTicker.C:
err := sc.Ping()
if err != nil {
sc.Logger.Error("Ping", "err", err)
if err == ErrUnexpectedResponse {
return
}

closed, err := sc.reset()
if err != nil {
sc.Logger.Error("Reconnecting to remote signer failed", "err", err)
continue
}
if closed {
sc.Logger.Info("listener is closing")
return
}

sc.Logger.Info("Re-created connection to remote signer", "impl", sc)
}
case <-sc.cancelPing:
sc.pingTicker.Stop()
return
}
}
}()

return nil
}

// OnStop implements cmn.Service.
func (sc *SocketVal) OnStop() {
if sc.cancelPing != nil {
close(sc.cancelPing)
}
sc.Close()
}

//--------------------------------------------------------
// Connection and signer management

// waits to accept and sets a new connection.
// connection is closed in OnStop.
// returns true if the listener is closed
// (ie. it returns a nil conn).
func (sc *SocketVal) reset() (closed bool, err error) {
sc.mtx.Lock()
defer sc.mtx.Unlock()

// first check if the conn already exists and close it.
if sc.signer != nil {
if err := sc.signer.Close(); err != nil {
sc.Logger.Error("error closing socket val connection during reset", "err", err)
}
}

// wait for a new conn
conn, err := sc.acceptConnection()
if err != nil {
return false, err
}

// listener is closed
if conn == nil {
return true, nil
}

sc.signer, err = NewRemoteSignerClient(conn)
if err != nil {
// failed to fetch the pubkey. close out the connection.
if err := conn.Close(); err != nil {
sc.Logger.Error("error closing connection", "err", err)
}
return false, err
}
return false, nil
}

// Attempt to accept a connection.
// Times out after the listener's acceptDeadline
func (sc *SocketVal) acceptConnection() (net.Conn, error) {
conn, err := sc.listener.Accept()
if err != nil {
if !sc.IsRunning() {
return nil, nil // Ignore error from listener closing.
}
return nil, err
}
return conn, nil
}
Loading

0 comments on commit a883219

Please sign in to comment.