Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
stop using goprocess for shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Sep 19, 2021
1 parent 9f75e92 commit f1b7a19
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 62 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ go 1.16

require (
github.com/ipfs/go-log/v2 v2.0.3
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-nat v0.0.6-0.20210919090249-d0d64ea585d6
)
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ github.com/ipfs/go-log/v2 v2.0.3 h1:Q2gXcBoCALyLN/pUQlz1qgu0x3uFV6FzP9oXhpfyJpc=
github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d h1:68u9r4wEvL3gYg2jvAOgROwZ3H+Y3hIDk4tbbmIjcYQ=
Expand Down
15 changes: 7 additions & 8 deletions mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"net"
"sync"
"time"

"github.com/jbenet/goprocess"
)

// Mapping represents a port mapping in a NAT.
Expand Down Expand Up @@ -38,11 +36,11 @@ type Mapping interface {
type mapping struct {
sync.Mutex // guards all fields

nat *NAT
proto string
intport int
extport int
proc goprocess.Process
nat *NAT
proto string
intport int
extport int
teardown func(*mapping)

cached net.IP
cacheTime time.Time
Expand Down Expand Up @@ -117,5 +115,6 @@ func (m *mapping) ExternalAddr() (net.Addr, error) {
}

func (m *mapping) Close() error {
return m.proc.Close()
m.teardown(m)
return nil
}
107 changes: 58 additions & 49 deletions nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ import (
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-nat"

"github.com/jbenet/goprocess"
periodic "github.com/jbenet/goprocess/periodic"
"github.com/libp2p/go-nat"
)

var (
// ErrNoMapping signals no mapping exists for an address
ErrNoMapping = errors.New("mapping not established")
)
// ErrNoMapping signals no mapping exists for an address
var ErrNoMapping = errors.New("mapping not established")

var log = logging.Logger("nat")

Expand Down Expand Up @@ -54,29 +50,35 @@ func DiscoverNAT(ctx context.Context) (*NAT, error) {
type NAT struct {
natmu sync.Mutex
nat nat.NAT
proc goprocess.Process

refCount sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc

mappingmu sync.RWMutex // guards mappings
closed bool
mappings map[*mapping]struct{}
}

func newNAT(realNAT nat.NAT) *NAT {
ctx, cancel := context.WithCancel(context.Background())
return &NAT{
nat: realNAT,
proc: goprocess.WithParent(goprocess.Background()),
mappings: make(map[*mapping]struct{}),
nat: realNAT,
mappings: make(map[*mapping]struct{}),
ctx: ctx,
ctxCancel: cancel,
}
}

// Close shuts down all port mappings. NAT can no longer be used.
func (nat *NAT) Close() error {
return nat.proc.Close()
}
nat.mappingmu.Lock()
nat.closed = true
nat.mappingmu.Unlock()

// Process returns the nat's life-cycle manager, for making it listen
// to close signals.
func (nat *NAT) Process() goprocess.Process {
return nat.proc
nat.ctxCancel()
nat.refCount.Wait()
return nil
}

// Mappings returns a slice of all NAT mappings
Expand All @@ -90,21 +92,6 @@ func (nat *NAT) Mappings() []Mapping {
return maps2
}

func (nat *NAT) addMapping(m *mapping) {
// make mapping automatically close when nat is closed.
nat.proc.AddChild(m.proc)

nat.mappingmu.Lock()
nat.mappings[m] = struct{}{}
nat.mappingmu.Unlock()
}

func (nat *NAT) rmMapping(m *mapping) {
nat.mappingmu.Lock()
delete(nat.mappings, m)
nat.mappingmu.Unlock()
}

// NewMapping attempts to construct a mapping on protocol and internal port
// It will also periodically renew the mapping until the returned Mapping
// -- or its parent NAT -- is Closed.
Expand All @@ -125,36 +112,58 @@ func (nat *NAT) NewMapping(protocol string, port int) (Mapping, error) {
}

m := &mapping{
intport: port,
nat: nat,
proto: protocol,
intport: port,
nat: nat,
proto: protocol,
teardown: nat.removeMapping,
}

m.proc = goprocess.WithTeardown(func() error {
nat.rmMapping(m)
nat.natmu.Lock()
defer nat.natmu.Unlock()
nat.nat.DeletePortMapping(m.Protocol(), m.InternalPort())
return nil
})

nat.addMapping(m)

m.proc.AddChild(periodic.Every(MappingDuration/3, func(worker goprocess.Process) {
nat.establishMapping(m)
}))
nat.mappingmu.Lock()
if nat.closed {
nat.mappingmu.Unlock()
return nil, errors.New("closed")
}
nat.mappings[m] = struct{}{}
nat.refCount.Add(1)
nat.mappingmu.Unlock()
go nat.refreshMappings(m)

// do it once synchronously, so first mapping is done right away, and before exiting,
// allowing users -- in the optimistic case -- to use results right after.
nat.establishMapping(m)
return m, nil
}

func (nat *NAT) removeMapping(m *mapping) {
nat.mappingmu.Lock()
delete(nat.mappings, m)
nat.mappingmu.Unlock()
nat.natmu.Lock()
nat.nat.DeletePortMapping(m.Protocol(), m.InternalPort())
nat.natmu.Unlock()
}

func (nat *NAT) refreshMappings(m *mapping) {
defer nat.refCount.Done()
t := time.NewTicker(MappingDuration / 3)
defer t.Stop()

for {
select {
case <-t.C:
nat.establishMapping(m)
case <-nat.ctx.Done():
m.Close()
return
}
}
}

func (nat *NAT) establishMapping(m *mapping) {
oldport := m.ExternalPort()

log.Debugf("Attempting port map: %s/%d", m.Protocol(), m.InternalPort())
comment := "libp2p"
const comment = "libp2p"

nat.natmu.Lock()
newport, err := nat.nat.AddPortMapping(m.Protocol(), m.InternalPort(), comment, MappingDuration)
Expand Down

0 comments on commit f1b7a19

Please sign in to comment.