Skip to content

Commit

Permalink
feat: 支持当北极星服务端全下线并恢复后,等待3TTL后再重启check任务 (#5) (#1008)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewshan authored Mar 16, 2023
1 parent 9bcda11 commit 71a86a8
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 146 deletions.
3 changes: 0 additions & 3 deletions apiserver/eurekaserver/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,6 @@ func (h *EurekaServer) registerInstances(
appId = formatWriteName(appId)
// 1. 先转换数据结构
totalInstance := convertEurekaInstance(instance, h.namespace, appId)
if replicated {
totalInstance.Metadata[model.MetadataReplicated] = "true"
}
// 3. 注册实例
resp := h.namingServer.RegisterInstance(ctx, totalInstance)
// 4. 注册成功,则返回
Expand Down
3 changes: 0 additions & 3 deletions common/model/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,4 @@ const (

// MetaKeyBuildRevision build revision for server
MetaKeyBuildRevision = "build-revision"

// MetadataReplicated indicate the instance is replicated from other naming server
MetadataReplicated = "internal-replicated"
)
58 changes: 58 additions & 0 deletions common/redispool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package redispool

// Resp ckv任务结果
type Resp struct {
Value string
Err error
Exists bool
Compatible bool
shouldRetry bool
}

// RedisObject 序列化对象
type RedisObject interface {
// Serialize 序列化成字符串
Serialize(compatible bool) string
// Deserialize 反序列为对象
Deserialize(value string, compatible bool) error
}

type Pool interface {
// Start 启动ckv连接池工作
Start()

// Sdd 使用连接池,向redis发起Sdd请求
Sdd(id string, members []string) *Resp

// Srem 使用连接池,向redis发起Srem请求
Srem(id string, members []string) *Resp

// Get 使用连接池,向redis发起Get请求
Get(id string) *Resp

// Set 使用连接池,向redis发起Set请求
Set(id string, redisObj RedisObject) *Resp

// Del 使用连接池,向redis发起Del请求
Del(id string) *Resp

// RecoverTimeSec the time second record when recover
RecoverTimeSec() int64
}
59 changes: 21 additions & 38 deletions common/redispool/redis_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,8 @@ func (t Task) String() string {
return fmt.Sprintf("{taskType: %s, id: %s}", typeToCommand[t.taskType], t.id)
}

// Resp ckv任务结果
type Resp struct {
Value string
Err error
Exists bool
Compatible bool
shouldRetry bool
}

// Pool ckv连接池结构体
type Pool struct {
type redisPool struct {
config *Config
ctx context.Context
redisClient redis.UniversalClient
Expand Down Expand Up @@ -123,8 +114,8 @@ func NewRedisClient(config *Config, opts ...Option) redis.UniversalClient {
return redisClient
}

// NewPool init a redis connection pool instance
func NewPool(ctx context.Context, config *Config, statis plugin.Statis, opts ...Option) *Pool {
// NewRedisPool init a redis connection pool instance
func NewRedisPool(ctx context.Context, config *Config, statis plugin.Statis, opts ...Option) Pool {
if config.WriteTimeout == 0 {
config.WriteTimeout = config.MsgTimeout
}
Expand All @@ -134,7 +125,7 @@ func NewPool(ctx context.Context, config *Config, statis plugin.Statis, opts ...
}

redisClient := NewRedisClient(config, opts...)
pool := &Pool{
pool := &redisPool{
config: config,
ctx: ctx,
redisClient: redisClient,
Expand All @@ -150,7 +141,7 @@ func NewPool(ctx context.Context, config *Config, statis plugin.Statis, opts ...
}

// Get 使用连接池,向redis发起Get请求
func (p *Pool) Get(id string) *Resp {
func (p *redisPool) Get(id string) *Resp {
if err := p.checkRedisDead(); err != nil {
return &Resp{Err: err}
}
Expand All @@ -162,7 +153,7 @@ func (p *Pool) Get(id string) *Resp {
}

// Sdd 使用连接池,向redis发起Sdd请求
func (p *Pool) Sdd(id string, members []string) *Resp {
func (p *redisPool) Sdd(id string, members []string) *Resp {
if err := p.checkRedisDead(); err != nil {
return &Resp{Err: err}
}
Expand All @@ -175,7 +166,7 @@ func (p *Pool) Sdd(id string, members []string) *Resp {
}

// Srem 使用连接池,向redis发起Srem请求
func (p *Pool) Srem(id string, members []string) *Resp {
func (p *redisPool) Srem(id string, members []string) *Resp {
if err := p.checkRedisDead(); err != nil {
return &Resp{Err: err}
}
Expand All @@ -187,16 +178,8 @@ func (p *Pool) Srem(id string, members []string) *Resp {
return p.handleTaskWithRetries(task)
}

// RedisObject 序列化对象
type RedisObject interface {
// Serialize 序列化成字符串
Serialize(compatible bool) string
// Deserialize 反序列为对象
Deserialize(value string, compatible bool) error
}

// Set 使用连接池,向redis发起Set请求
func (p *Pool) Set(id string, redisObj RedisObject) *Resp {
func (p *redisPool) Set(id string, redisObj RedisObject) *Resp {
if err := p.checkRedisDead(); err != nil {
return &Resp{Err: err}
}
Expand All @@ -209,7 +192,7 @@ func (p *Pool) Set(id string, redisObj RedisObject) *Resp {
}

// Del 使用连接池,向redis发起Del请求
func (p *Pool) Del(id string) *Resp {
func (p *redisPool) Del(id string) *Resp {
if err := p.checkRedisDead(); err != nil {
return &Resp{Err: err}
}
Expand All @@ -220,29 +203,29 @@ func (p *Pool) Del(id string) *Resp {
return p.handleTaskWithRetries(task)
}

func (p *Pool) checkRedisDead() error {
func (p *redisPool) checkRedisDead() error {
if atomic.LoadUint32(&p.redisDead) == 1 {
return fmt.Errorf("redis %s is dead", p.config.KvAddr)
}
return nil
}

// Start 启动ckv连接池工作
func (p *Pool) Start() {
func (p *redisPool) Start() {
wg := &sync.WaitGroup{}
wg.Add(p.config.Concurrency)
p.startWorkers(wg)
go p.checkRedis(wg)
log.Infof("[RedisPool]redis pool started")
}

func (p *Pool) startWorkers(wg *sync.WaitGroup) {
func (p *redisPool) startWorkers(wg *sync.WaitGroup) {
for i := 0; i < p.config.Concurrency; i++ {
go p.process(wg, i)
}
}

func (p *Pool) process(wg *sync.WaitGroup, idx int) {
func (p *redisPool) process(wg *sync.WaitGroup, idx int) {
log.Infof("[RedisPool]redis worker %d started", idx)
ticker := time.NewTicker(p.config.WaitTime)
piper := p.redisClient.Pipeline()
Expand Down Expand Up @@ -271,7 +254,7 @@ func (p *Pool) process(wg *sync.WaitGroup, idx int) {
}
}

func (p *Pool) handleTasks(tasks []*Task, piper redis.Pipeliner) {
func (p *redisPool) handleTasks(tasks []*Task, piper redis.Pipeliner) {
cmders := make([]redis.Cmder, len(tasks))
for i, task := range tasks {
cmders[i] = p.doHandleTask(task, piper)
Expand Down Expand Up @@ -318,7 +301,7 @@ func sleep(dur time.Duration) {
}

// checkRedis check redis alive
func (p *Pool) checkRedis(wg *sync.WaitGroup) {
func (p *redisPool) checkRedis(wg *sync.WaitGroup) {
ticker := time.NewTicker(redisCheckInterval)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -350,12 +333,12 @@ func (p *Pool) checkRedis(wg *sync.WaitGroup) {
}

// RecoverTimeSec the time second record when recover
func (p *Pool) RecoverTimeSec() int64 {
func (p *redisPool) RecoverTimeSec() int64 {
return atomic.LoadInt64(&p.recoverTimeSec)
}

// doCheckRedis test the connection
func (p *Pool) doCheckRedis() bool {
func (p *redisPool) doCheckRedis() bool {
_, err := p.redisClient.Ping(context.Background()).Result()

return err == nil
Expand All @@ -377,7 +360,7 @@ func nextIndex() int64 {
}

// handleTaskWithRetries 任务重试执行
func (p *Pool) handleTaskWithRetries(task *Task) *Resp {
func (p *redisPool) handleTaskWithRetries(task *Task) *Resp {
var count = 1
if p.config.MaxRetry > 0 {
count += p.config.MaxRetry
Expand All @@ -397,7 +380,7 @@ func (p *Pool) handleTaskWithRetries(task *Task) *Resp {
}

// handleTask 任务处理函数
func (p *Pool) handleTask(task *Task) *Resp {
func (p *redisPool) handleTask(task *Task) *Resp {
var startTime = time.Now()
task.respChan = make(chan *Resp, 1)
idx := int(nextIndex()) % len(p.taskChans)
Expand Down Expand Up @@ -425,7 +408,7 @@ const (
callResultFail = 1
)

func (p *Pool) afterHandleTask(startTime time.Time, command string, task *Task, resp *Resp) {
func (p *redisPool) afterHandleTask(startTime time.Time, command string, task *Task, resp *Resp) {
costDuration := time.Since(startTime)
if costDuration >= maxProcessDuration && task.taskType != Get {
log.Warnf("[RedisPool] too slow to process task %s, "+
Expand All @@ -449,7 +432,7 @@ func (p *Pool) afterHandleTask(startTime time.Time, command string, task *Task,
})
}

func (p *Pool) doHandleTask(task *Task, piper redis.Pipeliner) redis.Cmder {
func (p *redisPool) doHandleTask(task *Task, piper redis.Pipeliner) redis.Cmder {
switch task.taskType {
case Set:
return piper.Set(context.Background(), toRedisKey(task.id, p.config.Compatible), task.value, 0)
Expand Down
4 changes: 4 additions & 0 deletions plugin/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type HealthChecker interface {
AddToCheck(request *AddCheckRequest) error
// RemoveFromCheck removes the instances from check procedure
RemoveFromCheck(request *AddCheckRequest) error
// Suspend health checker for entire expired duration manually
Suspend()
// SuspendTimeSec get the suspend time in seconds
SuspendTimeSec() int64
// Delete delete the id
Delete(id string) error
}
Expand Down
34 changes: 32 additions & 2 deletions plugin/healthchecker/heartbeatmemory/checker_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package heartbeatmemory

import (
"sync"
"sync/atomic"

commonLog "github.com/polarismesh/polaris/common/log"
commontime "github.com/polarismesh/polaris/common/time"
"github.com/polarismesh/polaris/plugin"
)

Expand All @@ -41,7 +43,8 @@ type HeartbeatRecord struct {

// MemoryHealthChecker memory health checker
type MemoryHealthChecker struct {
hbRecords *sync.Map
hbRecords *sync.Map
suspendTimeSec int64
}

// Name return plugin name
Expand Down Expand Up @@ -94,6 +97,18 @@ func (r *MemoryHealthChecker) Query(request *plugin.QueryRequest) (*plugin.Query
}, nil
}

func (r *MemoryHealthChecker) skipCheck(instanceId string, expireDurationSec int64) bool {
suspendTimeSec := r.SuspendTimeSec()
localCurTimeSec := commontime.CurrentMillisecond() / 1000
if suspendTimeSec > 0 && localCurTimeSec >= suspendTimeSec && localCurTimeSec-suspendTimeSec < expireDurationSec {
log.Infof("[Health Check][MemoryCheck]health check redis suspended, "+
"suspendTimeSec is %d, localCurTimeSec is %d, expireDurationSec is %d, instanceId %s",
suspendTimeSec, localCurTimeSec, expireDurationSec, instanceId)
return true
}
return false
}

// Check Report process the instance check
func (r *MemoryHealthChecker) Check(request *plugin.CheckRequest) (*plugin.CheckResponse, error) {
queryResp, err := r.Query(&request.QueryRequest)
Expand All @@ -106,14 +121,18 @@ func (r *MemoryHealthChecker) Check(request *plugin.CheckRequest) (*plugin.Check
}
curTimeSec := request.CurTimeSec()
log.Debugf("[HealthCheck][MemoryCheck]check hb record, cur is %d, last is %d", curTimeSec, lastHeartbeatTime)
if r.skipCheck(request.InstanceId, int64(request.ExpireDurationSec)) {
checkResp.StayUnchanged = true
return checkResp, nil
}
if curTimeSec > lastHeartbeatTime {
if curTimeSec-lastHeartbeatTime >= int64(request.ExpireDurationSec) {
// 心跳超时
checkResp.Healthy = false

if request.Healthy {
log.Infof("[Health Check][MemoryCheck]health check expired, "+
"last hb timestamp is %d, curTimeSec is %d, expireDurationSec is %d instanceId %s",
"last hb timestamp is %d, curTimeSec is %d, expireDurationSec is %d, instanceId %s",
lastHeartbeatTime, curTimeSec, request.ExpireDurationSec, request.InstanceId)
} else {
checkResp.StayUnchanged = true
Expand Down Expand Up @@ -149,6 +168,17 @@ func (r *MemoryHealthChecker) Delete(id string) error {
return nil
}

func (r *MemoryHealthChecker) Suspend() {
curTimeMilli := commontime.CurrentMillisecond() / 1000
log.Infof("[Health Check][MemoryCheck] suspend checker, start time %d", curTimeMilli)
atomic.StoreInt64(&r.suspendTimeSec, curTimeMilli)
}

// SuspendTimeSec get suspend time in seconds
func (r *MemoryHealthChecker) SuspendTimeSec() int64 {
return atomic.LoadInt64(&r.suspendTimeSec)
}

func init() {
d := &MemoryHealthChecker{}
plugin.RegisterPlugin(d.Name(), d)
Expand Down
Loading

0 comments on commit 71a86a8

Please sign in to comment.