Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/discover: New endpoint format #743

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (s *Ethereum) NodeInfo() *NodeInfo {
NodeUrl: node.String(),
NodeID: node.ID.String(),
IP: node.IP.String(),
DiscPort: node.DiscPort,
TCPPort: node.TCPPort,
DiscPort: int(node.UDP),
TCPPort: int(node.TCP),
ListenAddr: s.net.ListenAddr,
Td: s.ChainManager().Td().String(),
}
Expand Down
10 changes: 6 additions & 4 deletions p2p/discover/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"os"
"path/filepath"
"reflect"
"testing"
"time"
)
Expand Down Expand Up @@ -86,9 +87,10 @@ func TestNodeDBInt64(t *testing.T) {

func TestNodeDBFetchStore(t *testing.T) {
node := &Node{
ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
IP: net.IP([]byte{192, 168, 0, 1}),
TCPPort: 30303,
ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
IP: net.IP([]byte{192, 168, 0, 1}),
UDP: 30303,
TCP: 30303,
}
inst := time.Now()

Expand Down Expand Up @@ -124,7 +126,7 @@ func TestNodeDBFetchStore(t *testing.T) {
}
if stored := db.node(node.ID); stored == nil {
t.Errorf("node: not found")
} else if !bytes.Equal(stored.ID[:], node.ID[:]) || !stored.IP.Equal(node.IP) || stored.TCPPort != node.TCPPort {
} else if !reflect.DeepEqual(stored, node) {
t.Errorf("node: data mismatch: have %v, want %v", stored, node)
}
}
Expand Down
61 changes: 22 additions & 39 deletions p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"math/big"
"math/rand"
"net"
Expand All @@ -16,49 +15,45 @@ import (

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/ethereum/go-ethereum/rlp"
)

const nodeIDBits = 512

// Node represents a host on the network.
type Node struct {
ID NodeID
IP net.IP

DiscPort int // UDP listening port for discovery protocol
TCPPort int // TCP listening port for RLPx
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP, TCP uint16 // port numbers
ID NodeID
}

func newNode(id NodeID, addr *net.UDPAddr) *Node {
ip := addr.IP.To4()
if ip == nil {
ip = addr.IP.To16()
}
return &Node{
ID: id,
IP: addr.IP,
DiscPort: addr.Port,
TCPPort: addr.Port,
IP: ip,
UDP: uint16(addr.Port),
TCP: uint16(addr.Port),
ID: id,
}
}

func (n *Node) isValid() bool {
// TODO: don't accept localhost, LAN addresses from internet hosts
return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.TCPPort != 0 && n.DiscPort != 0
}

func (n *Node) addr() *net.UDPAddr {
return &net.UDPAddr{IP: n.IP, Port: n.DiscPort}
return &net.UDPAddr{IP: n.IP, Port: int(n.UDP)}
}

// The string representation of a Node is a URL.
// Please see ParseNode for a description of the format.
func (n *Node) String() string {
addr := net.TCPAddr{IP: n.IP, Port: n.TCPPort}
addr := net.TCPAddr{IP: n.IP, Port: int(n.TCP)}
u := url.URL{
Scheme: "enode",
User: url.User(fmt.Sprintf("%x", n.ID[:])),
Host: addr.String(),
}
if n.DiscPort != n.TCPPort {
u.RawQuery = "discport=" + strconv.Itoa(n.DiscPort)
if n.UDP != n.TCP {
u.RawQuery = "discport=" + strconv.Itoa(int(n.UDP))
}
return u.String()
}
Expand Down Expand Up @@ -98,16 +93,20 @@ func ParseNode(rawurl string) (*Node, error) {
if n.IP = net.ParseIP(ip); n.IP == nil {
return nil, errors.New("invalid IP address")
}
if n.TCPPort, err = strconv.Atoi(port); err != nil {
tcp, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return nil, errors.New("invalid port")
}
n.TCP = uint16(tcp)
qv := u.Query()
if qv.Get("discport") == "" {
n.DiscPort = n.TCPPort
n.UDP = n.TCP
} else {
if n.DiscPort, err = strconv.Atoi(qv.Get("discport")); err != nil {
udp, err := strconv.ParseUint(qv.Get("discport"), 10, 16)
if err != nil {
return nil, errors.New("invalid discport in query")
}
n.UDP = uint16(udp)
}
return &n, nil
}
Expand All @@ -121,22 +120,6 @@ func MustParseNode(rawurl string) *Node {
return n
}

func (n Node) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, rpcNode{IP: n.IP.String(), Port: uint16(n.TCPPort), ID: n.ID})
}
func (n *Node) DecodeRLP(s *rlp.Stream) (err error) {
var ext rpcNode
if err = s.Decode(&ext); err == nil {
n.TCPPort = int(ext.Port)
n.DiscPort = int(ext.Port)
n.ID = ext.ID
if n.IP = net.ParseIP(ext.IP); n.IP == nil {
return errors.New("invalid IP string")
}
}
return err
}

// NodeID is a unique identifier for each node.
// The node identifier is a marshaled elliptic curve public key.
type NodeID [nodeIDBits / 8]byte
Expand Down
26 changes: 13 additions & 13 deletions p2p/discover/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,28 @@ var parseNodeTests = []struct {
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150",
wantResult: &Node{
ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
IP: net.ParseIP("127.0.0.1"),
DiscPort: 52150,
TCPPort: 52150,
ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
IP: net.ParseIP("127.0.0.1"),
UDP: 52150,
TCP: 52150,
},
},
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@[::]:52150",
wantResult: &Node{
ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
IP: net.ParseIP("::"),
DiscPort: 52150,
TCPPort: 52150,
ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
IP: net.ParseIP("::"),
UDP: 52150,
TCP: 52150,
},
},
{
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150?discport=223344",
rawurl: "enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150?discport=22334",
wantResult: &Node{
ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
IP: net.ParseIP("127.0.0.1"),
DiscPort: 223344,
TCPPort: 52150,
ID: MustHexID("0x1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
IP: net.ParseIP("127.0.0.1"),
UDP: 22334,
TCP: 52150,
},
},
}
Expand Down
9 changes: 2 additions & 7 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) {
rc := make(chan *Node, len(nodes))
for i := range nodes {
go func(n *Node) {
nn, _ := tab.bond(false, n.ID, n.addr(), uint16(n.TCPPort))
nn, _ := tab.bond(false, n.ID, n.addr(), uint16(n.TCP))
rc <- nn
}(nodes[i])
}
Expand Down Expand Up @@ -299,12 +299,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
tab.net.waitping(id)
}
// Bonding succeeded, update the node database
w.n = &Node{
ID: id,
IP: addr.IP,
DiscPort: addr.Port,
TCPPort: int(tcpPort),
}
w.n = &Node{ID: id, IP: addr.IP, UDP: uint16(addr.Port), TCP: tcpPort}
tab.db.updateNode(w.n)
close(w.done)
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ func (t findnodeOracle) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID
panic("query to node at distance 0")
default:
// TODO: add more randomness to distances
next := toaddr.Port - 1
next := uint16(toaddr.Port) - 1
for i := 0; i < bucketSize; i++ {
result = append(result, &Node{ID: randomID(t.target, next), DiscPort: next})
result = append(result, &Node{ID: randomID(t.target, int(next)), UDP: next})
}
}
return result, nil
Expand Down
80 changes: 53 additions & 27 deletions p2p/discover/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)

const Version = 3
const Version = 4

// Errors
var (
Expand Down Expand Up @@ -49,16 +49,20 @@ const (
// RPC request structures
type (
ping struct {
Version uint // must match Version
IP string // our IP
Port uint16 // our port
Version uint
From, To rpcEndpoint
Expiration uint64
}

// reply to Ping
// pong is the reply to ping.
pong struct {
ReplyTok []byte
Expiration uint64
// This field should mirror the UDP envelope address
// of the ping packet, which provides a way to discover the
// the external address (after NAT).
To rpcEndpoint

ReplyTok []byte // This contains the hash of the ping packet.
Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
}

findnode struct {
Expand All @@ -73,12 +77,25 @@ type (
Nodes []*Node
Expiration uint64
}

rpcEndpoint struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP uint16 // for discovery protocol
TCP uint16 // for RLPx protocol
}
)

type rpcNode struct {
IP string
Port uint16
ID NodeID
func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint {
ip := addr.IP.To4()
if ip == nil {
ip = addr.IP.To16()
}
return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort}
}

func validNode(n *Node) bool {
// TODO: don't accept localhost, LAN addresses from internet hosts
return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.UDP != 0
}

type packet interface {
Expand All @@ -94,8 +111,9 @@ type conn interface {

// udp implements the RPC protocol.
type udp struct {
conn conn
priv *ecdsa.PrivateKey
conn conn
priv *ecdsa.PrivateKey
ourEndpoint rpcEndpoint

addpending chan *pending
gotreply chan reply
Expand Down Expand Up @@ -176,6 +194,8 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath strin
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
// TODO: separate TCP port
udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath)
go udp.loop()
go udp.readLoop()
Expand All @@ -194,8 +214,8 @@ func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
t.send(toaddr, pingPacket, ping{
Version: Version,
IP: t.self.IP.String(),
Port: uint16(t.self.TCPPort),
From: t.ourEndpoint,
To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
return <-errc
Expand All @@ -214,7 +234,7 @@ func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node
reply := r.(*neighbors)
for _, n := range reply.Nodes {
nreceived++
if n.isValid() {
if validNode(n) {
nodes = append(nodes, n)
}
}
Expand Down Expand Up @@ -374,17 +394,22 @@ func (t *udp) readLoop() {
if err != nil {
return
}
packet, fromID, hash, err := decodePacket(buf[:nbytes])
if err != nil {
glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err)
continue
}
status := "ok"
if err := packet.handle(t, from, fromID, hash); err != nil {
status = err.Error()
}
glog.V(logger.Detail).Infof("<<< %v %T: %s\n", from, packet, status)
t.handlePacket(from, buf[:nbytes])
}
}

func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
packet, fromID, hash, err := decodePacket(buf)
if err != nil {
glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err)
return err
}
status := "ok"
if err = packet.handle(t, from, fromID, hash); err != nil {
status = err.Error()
}
glog.V(logger.Detail).Infof("<<< %v %T: %s\n", from, packet, status)
return err
}

func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
Expand Down Expand Up @@ -425,12 +450,13 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
return errBadVersion
}
t.send(from, pongPacket, pong{
To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
if !t.handleReply(fromID, pingPacket, req) {
// Note: we're ignoring the provided IP address right now
go t.bond(true, fromID, from, req.Port)
go t.bond(true, fromID, from, req.From.TCP)
}
return nil
}
Expand Down
Loading