-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1174 from libp2p/circuitv2
move the circuit v2 code here
- Loading branch information
Showing
32 changed files
with
4,922 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package client | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"github.com/libp2p/go-libp2p/p2p/host/circuitv2/proto" | ||
|
||
"github.com/libp2p/go-libp2p-core/host" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
|
||
logging "github.com/ipfs/go-log" | ||
tptu "github.com/libp2p/go-libp2p-transport-upgrader" | ||
) | ||
|
||
var log = logging.Logger("p2p-circuit") | ||
|
||
// Client implements the client-side of the p2p-circuit/v2 protocol: | ||
// - it implements dialing through v2 relays | ||
// - it listens for incoming connections through v2 relays. | ||
// | ||
// For backwards compatibility with v1 relays and older nodes, the client will | ||
// also accept relay connections through v1 relays and fallback dial peers using p2p-circuit/v1. | ||
// This allows us to use the v2 code as drop in replacement for v1 in a host without breaking | ||
// existing code and interoperability with older nodes. | ||
type Client struct { | ||
ctx context.Context | ||
host host.Host | ||
upgrader *tptu.Upgrader | ||
|
||
incoming chan accept | ||
|
||
mx sync.Mutex | ||
activeDials map[peer.ID]*completion | ||
hopCount map[peer.ID]int | ||
} | ||
|
||
type accept struct { | ||
conn *Conn | ||
writeResponse func() error | ||
} | ||
|
||
type completion struct { | ||
ch chan struct{} | ||
relay peer.ID | ||
err error | ||
} | ||
|
||
// New constructs a new p2p-circuit/v2 client, attached to the given host and using the given | ||
// upgrader to perform connection upgrades. | ||
func New(ctx context.Context, h host.Host, upgrader *tptu.Upgrader) (*Client, error) { | ||
return &Client{ | ||
ctx: ctx, | ||
host: h, | ||
upgrader: upgrader, | ||
incoming: make(chan accept), | ||
activeDials: make(map[peer.ID]*completion), | ||
hopCount: make(map[peer.ID]int), | ||
}, nil | ||
} | ||
|
||
// Start registers the circuit (client) protocol stream handlers | ||
func (c *Client) Start() { | ||
c.host.SetStreamHandler(proto.ProtoIDv1, c.handleStreamV1) | ||
c.host.SetStreamHandler(proto.ProtoIDv2Stop, c.handleStreamV2) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
package client | ||
|
||
import ( | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
|
||
ma "github.com/multiformats/go-multiaddr" | ||
manet "github.com/multiformats/go-multiaddr/net" | ||
) | ||
|
||
// HopTagWeight is the connection manager weight for connections carrying relay hop streams | ||
var HopTagWeight = 5 | ||
|
||
type statLimitDuration struct{} | ||
type statLimitData struct{} | ||
|
||
var ( | ||
StatLimitDuration = statLimitDuration{} | ||
StatLimitData = statLimitData{} | ||
) | ||
|
||
type Conn struct { | ||
stream network.Stream | ||
remote peer.AddrInfo | ||
stat network.Stat | ||
|
||
client *Client | ||
} | ||
|
||
type NetAddr struct { | ||
Relay string | ||
Remote string | ||
} | ||
|
||
var _ net.Addr = (*NetAddr)(nil) | ||
|
||
func (n *NetAddr) Network() string { | ||
return "libp2p-circuit-relay" | ||
} | ||
|
||
func (n *NetAddr) String() string { | ||
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay) | ||
} | ||
|
||
// Conn interface | ||
var _ manet.Conn = (*Conn)(nil) | ||
|
||
func (c *Conn) Close() error { | ||
c.untagHop() | ||
return c.stream.Reset() | ||
} | ||
|
||
func (c *Conn) Read(buf []byte) (int, error) { | ||
return c.stream.Read(buf) | ||
} | ||
|
||
func (c *Conn) Write(buf []byte) (int, error) { | ||
return c.stream.Write(buf) | ||
} | ||
|
||
func (c *Conn) SetDeadline(t time.Time) error { | ||
return c.stream.SetDeadline(t) | ||
} | ||
|
||
func (c *Conn) SetReadDeadline(t time.Time) error { | ||
return c.stream.SetReadDeadline(t) | ||
} | ||
|
||
func (c *Conn) SetWriteDeadline(t time.Time) error { | ||
return c.stream.SetWriteDeadline(t) | ||
} | ||
|
||
// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input" | ||
func (c *Conn) RemoteMultiaddr() ma.Multiaddr { | ||
// TODO: We should be able to do this directly without converting to/from a string. | ||
relayAddr, err := ma.NewComponent( | ||
ma.ProtocolWithCode(ma.P_P2P).Name, | ||
c.stream.Conn().RemotePeer().Pretty(), | ||
) | ||
if err != nil { | ||
panic(err) | ||
} | ||
return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr) | ||
} | ||
|
||
func (c *Conn) LocalMultiaddr() ma.Multiaddr { | ||
return c.stream.Conn().LocalMultiaddr() | ||
} | ||
|
||
func (c *Conn) LocalAddr() net.Addr { | ||
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr()) | ||
if err != nil { | ||
log.Error("failed to convert local multiaddr to net addr:", err) | ||
return nil | ||
} | ||
return na | ||
} | ||
|
||
func (c *Conn) RemoteAddr() net.Addr { | ||
return &NetAddr{ | ||
Relay: c.stream.Conn().RemotePeer().Pretty(), | ||
Remote: c.remote.ID.Pretty(), | ||
} | ||
} | ||
|
||
// ConnStat interface | ||
var _ network.ConnStat = (*Conn)(nil) | ||
|
||
func (c *Conn) Stat() network.Stat { | ||
return c.stat | ||
} | ||
|
||
// tagHop tags the underlying relay connection so that it can be (somewhat) protected from the | ||
// connection manager as it is an important connection that proxies other connections. | ||
// This is handled here so that the user code doesnt need to bother with this and avoid | ||
// clown shoes situations where a high value peer connection is behind a relayed connection and it is | ||
// implicitly because the connection manager closed the underlying relay connection. | ||
func (c *Conn) tagHop() { | ||
c.client.mx.Lock() | ||
defer c.client.mx.Unlock() | ||
|
||
p := c.stream.Conn().RemotePeer() | ||
c.client.hopCount[p]++ | ||
if c.client.hopCount[p] == 1 { | ||
c.client.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight) | ||
} | ||
} | ||
|
||
// untagHop removes the relay-hop-stream tag if necessary; it is invoked when a relayed connection | ||
// is closed. | ||
func (c *Conn) untagHop() { | ||
c.client.mx.Lock() | ||
defer c.client.mx.Unlock() | ||
|
||
p := c.stream.Conn().RemotePeer() | ||
c.client.hopCount[p]-- | ||
if c.client.hopCount[p] == 0 { | ||
c.client.host.ConnManager().UntagPeer(p, "relay-hop-stream") | ||
delete(c.client.hopCount, p) | ||
} | ||
} |
Oops, something went wrong.