Skip to content

Commit

Permalink
node health:
Browse files Browse the repository at this point in the history
- db: PK is on (hostname, token)
- aggressively written to, every 10 seconds
- aggressively purged (expire = insert interval * 2)
- written to also by CLI instances

recovery:
- added AcknowledgeCrashedRecoveries(): auto-acknowledge (mark as
failed) recoveries owned by a now-dead process. This is detected using
the above node health changes.
  • Loading branch information
shlomi-noach committed Nov 2, 2015
1 parent 6b666d2 commit 4a2e19a
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 22 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
set -e

RELEASE_VERSION="1.4.474"
RELEASE_VERSION="1.4.475"
TOPDIR=/tmp/orchestrator-release
export RELEASE_VERSION TOPDIR

Expand Down
22 changes: 20 additions & 2 deletions go/cmd/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@ import (
"github.com/outbrain/orchestrator/go/app"
"github.com/outbrain/orchestrator/go/config"
"github.com/outbrain/orchestrator/go/inst"
"github.com/outbrain/orchestrator/go/process"
"os"
"runtime"
)

type OrchestratorExecutionMode string

const (
CliMode OrchestratorExecutionMode = "CLIMode"
HttpMode = "HttpMode"
)

const prompt string = `
orchestrator [-c command] [-i instance] [-d destination] [--verbose|--debug] [... cli ] | http
Expand Down Expand Up @@ -856,11 +864,12 @@ func main() {
return
}

var executionMode OrchestratorExecutionMode
switch {
case len(flag.Args()) == 0 || flag.Arg(0) == "cli":
app.Cli(*command, *strict, *instance, *destination, *owner, *reason, *duration, *pattern, *clusterAlias, *pool, *hostnameFlag)
executionMode = CliMode
case flag.Arg(0) == "http":
app.Http(*discovery)
executionMode = HttpMode
default:
fmt.Fprintln(os.Stderr, `Usage:
orchestrator --options... [cli|http]
Expand All @@ -869,5 +878,14 @@ See complete list of commands:
Full blown documentation:
orchestrator
`)
os.Exit(1)
}

process.ContinuousRegistration(string(executionMode))
switch executionMode {
case CliMode:
app.Cli(*command, *strict, *instance, *destination, *owner, *reason, *duration, *pattern, *clusterAlias, *pool, *hostnameFlag)
case HttpMode:
app.Http(*discovery)
}
}
9 changes: 9 additions & 0 deletions go/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,15 @@ var generateSQLPatches = []string{
ALTER TABLE candidate_database_instance
ADD COLUMN promotion_rule enum('must', 'prefer', 'neutral', 'prefer_not', 'must_not') NOT NULL DEFAULT 'neutral'
`,
`
ALTER TABLE node_health
DROP PRIMARY KEY,
ADD PRIMARY KEY (hostname, token)
`,
`
ALTER TABLE node_health
ADD COLUMN extra_info varchar(128) CHARACTER SET utf8 NOT NULL
`,
}

// Track if a TLS has already been configured for topology
Expand Down
4 changes: 2 additions & 2 deletions go/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func ContinuousDiscovery() {
}
if !wasAlreadyElected {
// Just turned to be leader!
go process.HealthTest()
go process.RegisterNode("")
}
} else {
log.Debugf("Not elected as active node; polling")
Expand Down Expand Up @@ -243,14 +243,14 @@ func ContinuousDiscovery() {
inst.LoadHostnameResolveCacheFromDatabase()
}
go inst.ReadClusterAliases()
go process.HealthTest()
}()
case <-recoveryTick:
go func() {
if isElectedNode {
go ClearActiveFailureDetections()
go ClearActiveRecoveries()
go ExpireBlockedRecoveries()
go AcknowledgeCrashedRecoveries()
go CheckAndRecover(nil, nil, false)
}
}()
Expand Down
28 changes: 23 additions & 5 deletions go/logic/topology_recovery_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,18 @@ func ExpireBlockedRecoveries() error {
}

// acknowledgeRecoveries sets acknowledged* details and clears the in_active_period flags from a set of entries
func acknowledgeRecoveries(owner string, comment string, whereClause string) (countAcknowledgedEntries int64, err error) {
func acknowledgeRecoveries(owner string, comment string, markEndRecovery bool, whereClause string) (countAcknowledgedEntries int64, err error) {
additionalSet := ``
if markEndRecovery {
additionalSet = `
end_recovery=IFNULL(end_recovery, NOW()),
`
}
query := fmt.Sprintf(`
update topology_recovery set
in_active_period = 0,
end_active_period_unixtime = IF(end_active_period_unixtime = 0, UNIX_TIMESTAMP(), end_active_period_unixtime),
%s
acknowledged = 1,
acknowledged_at = NOW(),
acknowledged_by = ?,
Expand All @@ -262,7 +269,7 @@ func acknowledgeRecoveries(owner string, comment string, whereClause string) (co
acknowledged = 0
and
%s
`, whereClause)
`, additionalSet, whereClause)
sqlResult, err := db.ExecOrchestrator(query, owner, comment)
if err != nil {
return 0, log.Errore(err)
Expand All @@ -275,7 +282,7 @@ func acknowledgeRecoveries(owner string, comment string, whereClause string) (co
// This also implied clearing their active period, which in turn enables further recoveries on those topologies
func AcknowledgeRecovery(recoveryId int64, owner string, comment string) (countAcknowledgedEntries int64, err error) {
whereClause := fmt.Sprintf(`recovery_id = %d`, recoveryId)
return acknowledgeRecoveries(owner, comment, whereClause)
return acknowledgeRecoveries(owner, comment, false, whereClause)
}

// AcknowledgeClusterRecoveries marks active recoveries for given cluster as acknowledged.
Expand All @@ -284,7 +291,7 @@ func AcknowledgeClusterRecoveries(clusterName string, owner string, comment stri
whereClause := fmt.Sprintf(`
cluster_name = '%s'
`, clusterName)
return acknowledgeRecoveries(owner, comment, whereClause)
return acknowledgeRecoveries(owner, comment, false, whereClause)
}

// AcknowledgeInstanceRecoveries marks active recoveries for given instane as acknowledged.
Expand All @@ -294,7 +301,18 @@ func AcknowledgeInstanceRecoveries(instanceKey *inst.InstanceKey, owner string,
hostname = '%s'
and port = %d
`, instanceKey.Hostname, instanceKey.Port)
return acknowledgeRecoveries(owner, comment, whereClause)
return acknowledgeRecoveries(owner, comment, false, whereClause)
}

// AcknowledgeCrashedRecoveries marks recoveries whose processing nodes has crashed as acknowledged.
func AcknowledgeCrashedRecoveries() (countAcknowledgedEntries int64, err error) {
whereClause := `
in_active_period = 1
and (processing_node_hostname, processcing_node_token) not in (
select hostname, token from node_health
)
`
return acknowledgeRecoveries("orchestrator", "detected crashed recovery", true, whereClause)
}

// ResolveRecovery is called on completion of a recovery process and updates the recovery status.
Expand Down
61 changes: 49 additions & 12 deletions go/process/health_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package process

import (
"database/sql"
"fmt"

"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
"github.com/outbrain/orchestrator/go/db"
"time"
)

const registrationPollSeconds = 10

type HealthStatus struct {
Healthy bool
Hostname string
Expand All @@ -34,21 +37,27 @@ type HealthStatus struct {
AvailableNodes []string
}

// HealthTest attempts to write to the backend database and get a result
func HealthTest() (*HealthStatus, error) {
health := HealthStatus{Healthy: false, Hostname: ThisHostname, Token: ProcessToken.Hash}

sqlResult, err := db.ExecOrchestrator(`
// RegisterNode writes down this node in the node_health table
func RegisterNode(extraInfo string) (sql.Result, error) {
return db.ExecOrchestrator(`
insert into node_health
(hostname, token, last_seen_active)
(hostname, token, last_seen_active, extra_info)
values
(?, ?, NOW())
(?, ?, NOW(), ?)
on duplicate key update
token=values(token),
last_seen_active=values(last_seen_active)
last_seen_active=values(last_seen_active),
extra_info=if(values(extra_info) != '', values(extra_info), extra_info)
`,
ThisHostname, ProcessToken.Hash,
ThisHostname, ProcessToken.Hash, extraInfo,
)
}

// HealthTest attempts to write to the backend database and get a result
func HealthTest() (*HealthStatus, error) {
health := HealthStatus{Healthy: false, Hostname: ThisHostname, Token: ProcessToken.Hash}

sqlResult, err := RegisterNode("")
if err != nil {
health.Error = err
return &health, log.Errore(err)
Expand All @@ -72,6 +81,34 @@ func HealthTest() (*HealthStatus, error) {
return &health, nil
}

func ContinuousRegistration(extraInfo string) {
registrationTick := time.Tick(time.Duration(registrationPollSeconds) * time.Second)
tickOperation := func() {
RegisterNode(extraInfo)
expireAvailableNodes()
}
go func() {
go tickOperation()
for range registrationTick {
go tickOperation()
}
}()
}

// expireAvailableNodes is an aggressive puring method to remove node entries who have skipped
// their keepalive for two times
func expireAvailableNodes() error {
_, err := db.ExecOrchestrator(`
delete
from node_health
where
last_seen_active < now() - interval ? second
`,
registrationPollSeconds*2,
)
return log.Errore(err)
}

func readAvailableNodes() ([]string, error) {
res := []string{}
query := fmt.Sprintf(`
Expand All @@ -80,10 +117,10 @@ func readAvailableNodes() ([]string, error) {
from
node_health
where
last_seen_active > now() - interval 5 minute
last_seen_active > now() - interval %d second
order by
hostname
`)
`, registrationPollSeconds*2)

err := db.QueryOrchestratorRowsMap(query, func(m sqlutils.RowMap) error {
res = append(res, m.GetString("node"))
Expand Down

0 comments on commit 4a2e19a

Please sign in to comment.