Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Codereorg: Part 5, state #1492

Merged
merged 31 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a885e1f
format with prettier 3.0
kradalby Jul 14, 2023
2f7cbc8
Replace the timestamp based state system
kradalby Jun 21, 2023
47a75db
Split up MapResponse
kradalby Jun 29, 2023
a52db30
Add missing return in shutdown
kradalby Jul 7, 2023
8e973f4
add script to run integration tests
kradalby Jun 22, 2023
b6593dc
rearrange channel closing defers
kradalby Jul 14, 2023
3b8c4c9
add annoying linter to golangci
kradalby Jul 17, 2023
809db0c
add less/jq to hs debug container
kradalby Jul 17, 2023
1013e5f
add debug option to save all map responses
kradalby Jul 17, 2023
3ee0179
disable online map by default for now
kradalby Jul 17, 2023
3bfc123
introduce rw lock for db, ish...
kradalby Jul 17, 2023
c054fb5
additional debug logging, use mapper pointer
kradalby Jul 24, 2023
52a85bc
only send lite map responses when omitpeers
kradalby Jul 26, 2023
eb1a556
fix lint
kradalby Jul 26, 2023
d84ea0e
add maprequest to all mapper calls
kradalby Jul 26, 2023
6b620ca
remove retries for pings in tsic
kradalby Jul 26, 2023
6b49dad
filter out peers without endpoints
kradalby Jul 30, 2023
841e3c5
rearrange poll, lock, notify
kradalby Jul 26, 2023
7c2c9b0
Update packetfilter when peers change
kradalby Aug 9, 2023
1e74117
Remove database from Mapper
kradalby Aug 9, 2023
fdaf9da
move MapResponse peer logic into function and reuse
kradalby Aug 9, 2023
db5864d
give ci more tollerance for timeouts
kradalby Sep 10, 2023
df1f4b3
Upgrade go and debian in headscale docker
kradalby Sep 11, 2023
b6d64e1
add lock around saving ts clients
kradalby Sep 10, 2023
8882825
gitignore infolder tailscale
kradalby Sep 11, 2023
4b4b032
order path
kradalby Sep 11, 2023
db8761f
add pprof endpoint
kradalby Sep 11, 2023
ec39145
Remove LastSuccessfulUpdate from Machine
kradalby Sep 11, 2023
decbeb7
improve debug logging, rw lock for notifier
kradalby Sep 11, 2023
1f4d51d
Return simple responses immediatly
kradalby Sep 11, 2023
bc9fc67
handle route updates correctly
kradalby Sep 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Replace the timestamp based state system
This commit replaces the timestamp based state system with a new
one that has update channels directly to the connected nodes. It
will send an update to all listening clients via the polling
mechanism.

It introduces a new package notifier, which has a concurrency safe
manager for all our channels to the connected nodes.

Signed-off-by: Kristoffer Dalby <[email protected]>
  • Loading branch information
kradalby committed Sep 11, 2023
commit 2f7cbc813018c354f79eb02b2c2bddbc9af4bb34
108 changes: 18 additions & 90 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"net/http"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -26,13 +25,13 @@ import (
"github.com/juanfont/headscale/hscontrol/db"
"github.com/juanfont/headscale/hscontrol/derp"
derpServer "github.com/juanfont/headscale/hscontrol/derp/server"
"github.com/juanfont/headscale/hscontrol/notifier"
"github.com/juanfont/headscale/hscontrol/policy"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/util"
"github.com/patrickmn/go-cache"
zerolog "github.com/philip-bui/grpc-zerolog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/puzpuzpuz/xsync/v2"
zl "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/crypto/acme"
Expand Down Expand Up @@ -84,7 +83,7 @@ type Headscale struct {

ACLPolicy *policy.ACLPolicy

lastStateChange *xsync.MapOf[string, time.Time]
nodeNotifier *notifier.Notifier

oidcProvider *oidc.Provider
oauth2Config *oauth2.Config
Expand All @@ -93,9 +92,6 @@ type Headscale struct {

shutdownChan chan struct{}
pollNetMapStreamWG sync.WaitGroup

stateUpdateChan chan struct{}
cancelStateUpdateChan chan struct{}
}

func NewHeadscale(cfg *types.Config) (*Headscale, error) {
Expand Down Expand Up @@ -158,19 +154,14 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
noisePrivateKey: noisePrivateKey,
registrationCache: registrationCache,
pollNetMapStreamWG: sync.WaitGroup{},
lastStateChange: xsync.NewMapOf[time.Time](),

stateUpdateChan: make(chan struct{}),
cancelStateUpdateChan: make(chan struct{}),
nodeNotifier: notifier.NewNotifier(),
}

go app.watchStateChannel()

database, err := db.NewHeadscaleDatabase(
cfg.DBtype,
dbString,
app.dbDebug,
app.stateUpdateChan,
app.nodeNotifier,
cfg.IPPrefixes,
cfg.BaseDomain)
if err != nil {
Expand Down Expand Up @@ -203,7 +194,11 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {

if cfg.DERP.ServerEnabled {
// TODO(kradalby): replace this key with a dedicated DERP key.
embeddedDERPServer, err := derpServer.NewDERPServer(cfg.ServerURL, key.NodePrivate(*privateKey), &cfg.DERP)
embeddedDERPServer, err := derpServer.NewDERPServer(
cfg.ServerURL,
key.NodePrivate(*privateKey),
&cfg.DERP,
)
if err != nil {
return nil, err
}
Expand All @@ -230,10 +225,14 @@ func (h *Headscale) expireEphemeralNodes(milliSeconds int64) {

// expireExpiredMachines expires machines that have an explicit expiry set
// after that expiry time has passed.
func (h *Headscale) expireExpiredMachines(milliSeconds int64) {
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
func (h *Headscale) expireExpiredMachines(intervalMs int64) {
interval := time.Duration(intervalMs) * time.Millisecond
ticker := time.NewTicker(interval)

lastCheck := time.Unix(0, 0)

for range ticker.C {
h.db.ExpireExpiredMachines(h.getLastStateChange())
lastCheck = h.db.ExpireExpiredMachines(lastCheck)
}
}

Expand All @@ -258,7 +257,7 @@ func (h *Headscale) scheduledDERPMapUpdateWorker(cancelChan <-chan struct{}) {
h.DERPMap.Regions[region.RegionID] = &region
}

h.setLastStateChangeToNow()
h.nodeNotifier.NotifyAll()
}
}
}
Expand Down Expand Up @@ -722,7 +721,7 @@ func (h *Headscale) Serve() error {
Str("path", aclPath).
Msg("ACL policy successfully reloaded, notifying nodes of change")

h.setLastStateChangeToNow()
h.nodeNotifier.NotifyAll()
}

default:
Expand Down Expand Up @@ -760,10 +759,6 @@ func (h *Headscale) Serve() error {
// Stop listening (and unlink the socket if unix type):
socketListener.Close()

<-h.cancelStateUpdateChan
close(h.stateUpdateChan)
close(h.cancelStateUpdateChan)

// Close db connections
err = h.db.Close()
if err != nil {
Expand Down Expand Up @@ -859,73 +854,6 @@ func (h *Headscale) getTLSSettings() (*tls.Config, error) {
}
}

// TODO(kradalby): baby steps, make this more robust.
func (h *Headscale) watchStateChannel() {
for {
select {
case <-h.stateUpdateChan:
h.setLastStateChangeToNow()

case <-h.cancelStateUpdateChan:
return
}
}
}

func (h *Headscale) setLastStateChangeToNow() {
var err error

now := time.Now().UTC()

users, err := h.db.ListUsers()
if err != nil {
log.Error().
Caller().
Err(err).
Msg("failed to fetch all users, failing to update last changed state.")
}

for _, user := range users {
lastStateUpdate.WithLabelValues(user.Name, "headscale").Set(float64(now.Unix()))
if h.lastStateChange == nil {
h.lastStateChange = xsync.NewMapOf[time.Time]()
}
h.lastStateChange.Store(user.Name, now)
}
}

func (h *Headscale) getLastStateChange(users ...types.User) time.Time {
times := []time.Time{}

// getLastStateChange takes a list of users as a "filter", if no users
// are past, then use the entier list of users and look for the last update
if len(users) > 0 {
for _, user := range users {
if lastChange, ok := h.lastStateChange.Load(user.Name); ok {
times = append(times, lastChange)
}
}
} else {
h.lastStateChange.Range(func(key string, value time.Time) bool {
times = append(times, value)

return true
})
}

sort.Slice(times, func(i, j int) bool {
return times[i].After(times[j])
})

log.Trace().Msgf("Latest times %#v", times)

if len(times) == 0 {
return time.Now().UTC()
} else {
return times[0]
}
}

func notFoundHandler(
writer http.ResponseWriter,
req *http.Request,
Expand Down
6 changes: 0 additions & 6 deletions hscontrol/db/addresses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func (s *Suite) TestGetUsedIps(c *check.C) {

c.Assert(len(machine1.IPAddresses), check.Equals, 1)
c.Assert(machine1.IPAddresses[0], check.Equals, expected)

c.Assert(channelUpdates, check.Equals, int32(0))
}

func (s *Suite) TestGetMultiIp(c *check.C) {
Expand Down Expand Up @@ -153,8 +151,6 @@ func (s *Suite) TestGetMultiIp(c *check.C) {

c.Assert(len(nextIP2), check.Equals, 1)
c.Assert(nextIP2[0].String(), check.Equals, expectedNextIP.String())

c.Assert(channelUpdates, check.Equals, int32(0))
}

func (s *Suite) TestGetAvailableIpMachineWithoutIP(c *check.C) {
Expand Down Expand Up @@ -192,6 +188,4 @@ func (s *Suite) TestGetAvailableIpMachineWithoutIP(c *check.C) {

c.Assert(len(ips2), check.Equals, 1)
c.Assert(ips2[0].String(), check.Equals, expected.String())

c.Assert(channelUpdates, check.Equals, int32(0))
}
8 changes: 0 additions & 8 deletions hscontrol/db/api_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ func (*Suite) TestCreateAPIKey(c *check.C) {
keys, err := db.ListAPIKeys()
c.Assert(err, check.IsNil)
c.Assert(len(keys), check.Equals, 1)

c.Assert(channelUpdates, check.Equals, int32(0))
}

func (*Suite) TestAPIKeyDoesNotExist(c *check.C) {
Expand All @@ -41,8 +39,6 @@ func (*Suite) TestValidateAPIKeyOk(c *check.C) {
valid, err := db.ValidateAPIKey(apiKeyStr)
c.Assert(err, check.IsNil)
c.Assert(valid, check.Equals, true)

c.Assert(channelUpdates, check.Equals, int32(0))
}

func (*Suite) TestValidateAPIKeyNotOk(c *check.C) {
Expand Down Expand Up @@ -71,8 +67,6 @@ func (*Suite) TestValidateAPIKeyNotOk(c *check.C) {
validWithErr, err := db.ValidateAPIKey("produceerrorkey")
c.Assert(err, check.NotNil)
c.Assert(validWithErr, check.Equals, false)

c.Assert(channelUpdates, check.Equals, int32(0))
}

func (*Suite) TestExpireAPIKey(c *check.C) {
Expand All @@ -92,6 +86,4 @@ func (*Suite) TestExpireAPIKey(c *check.C) {
notValid, err := db.ValidateAPIKey(apiKeyStr)
c.Assert(err, check.IsNil)
c.Assert(notValid, check.Equals, false)

c.Assert(channelUpdates, check.Equals, int32(0))
}
15 changes: 6 additions & 9 deletions hscontrol/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/glebarez/sqlite"
"github.com/juanfont/headscale/hscontrol/notifier"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/util"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -36,8 +37,8 @@ type KV struct {
}

type HSDatabase struct {
db *gorm.DB
notifyStateChan chan<- struct{}
db *gorm.DB
notifier *notifier.Notifier

ipAllocationMutex sync.Mutex

Expand All @@ -50,7 +51,7 @@ type HSDatabase struct {
func NewHeadscaleDatabase(
dbType, connectionAddr string,
debug bool,
notifyStateChan chan<- struct{},
notifier *notifier.Notifier,
ipPrefixes []netip.Prefix,
baseDomain string,
) (*HSDatabase, error) {
Expand All @@ -60,8 +61,8 @@ func NewHeadscaleDatabase(
}

db := HSDatabase{
db: dbConn,
notifyStateChan: notifyStateChan,
db: dbConn,
notifier: notifier,

ipPrefixes: ipPrefixes,
baseDomain: baseDomain,
Expand Down Expand Up @@ -297,10 +298,6 @@ func openDB(dbType, connectionAddr string, debug bool) (*gorm.DB, error) {
)
}

func (hsdb *HSDatabase) notifyStateChange() {
hsdb.notifyStateChan <- struct{}{}
}

// getValue returns the value for the given key in KV.
func (hsdb *HSDatabase) getValue(key string) (string, error) {
var row KV
Expand Down
Loading