Skip to content

Commit

Permalink
refactor: scheduler evaluator (#805)
Browse files Browse the repository at this point in the history
* refactor: scheduler evaluator

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jun 28, 2023
1 parent dc2835a commit d01eef6
Show file tree
Hide file tree
Showing 24 changed files with 1,254 additions and 561 deletions.
18 changes: 7 additions & 11 deletions docs/en/deployment/configuration/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@

# scheduler policy configuration
scheduler:
# abtest whether to open ab test
# default: false
abtest: false
# aevaluator specify which evaluator is used when taskID suffix with _A
# only take effect when abtest is true
# default: ""
aevaluator: ""
# bevaluator specify which evaluator is used when taskID suffix with _B
# only take effect when abtest is true
# default: ""
bevaluator: ""
# algorithm configuration to use different scheduling algorithms,
# default configuration supports "default" and "ml"
# "default" is the rule-based scheduling algorithm, "ml" is the machine learning scheduling algorithm
# It also supports user plugin extension, the algorithm value is "plugin",
# and the compiled `d7y-scheduler-plugin-evaluator.so` file is added to
# the dragonfly working directory plugins
algorithm: default
# workerNum is the number of goroutines that perform scheduling tasks
# default:
workerNum: 12
Expand Down
16 changes: 5 additions & 11 deletions docs/zh-CN/deployment/configuration/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,11 @@

# scheduler 调度策略配置
scheduler:
# abtest 是否开启ab测试
# default: false
abtest: false
# aevaluator 指定当开启AB测试时,后缀为 _A 的 taskID 使用的 evaluator 计算方法
# 只有当开启了 AB 测试的情况下该配置才生效
# default: ""
aevaluator: ""
# bevaluator 指定当开启AB测试时,后缀为 _B 的 taskID 使用的 evaluator 计算方法
# 只有当开启了 AB 测试的情况下该配置才生效
# default: ""
bevaluator: ""
# algorithm 使用不同调度算法配置,当前默认支持 "default" 和 "ml" 两种类型
# "default" 为基于规则的调度算法, "ml" 为基于机器学习的调度算法
# 也支持用户 plugin 扩展的方式,值为 "plugin"
# 并且在 dragonfly 工作目录 plugins 中添加编译好的 `d7y-scheduler-plugin-evaluator.so` 文件
algorithm: default
# workerNum 执行调度任务处理的 goroutine 数量
# default: 默认机器可用的 CPU 数量
workerNum: 12
Expand Down
5 changes: 3 additions & 2 deletions internal/dfplugin/dfplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ const (
type PluginType string

const (
PluginTypeResource = PluginType("resource")
PluginTypeManager = PluginType("manager")
PluginTypeResource = PluginType("resource")
PluginTypeManager = PluginType("manager")
PluginTypeScheduler = PluginType("scheduler")
)

type PluginInitFunc func(option map[string]string) (plugin interface{}, meta map[string]string, err error)
Expand Down
21 changes: 1 addition & 20 deletions internal/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@
package idgen

import (
"hash/crc32"
"strings"

"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
"d7y.io/dragonfly/v2/pkg/util/net/urlutils"
)

const (
TwinsASuffix = "_A"
TwinsBSuffix = "_B"
)

// GenerateTaskID generates a taskId.
// TaskID generates a taskId.
// filter is separated by & character.
func TaskID(url string, meta *base.UrlMeta) string {
var data []string
Expand Down Expand Up @@ -58,16 +52,3 @@ func TaskID(url string, meta *base.UrlMeta) string {

return digestutils.Sha256(data...)
}

// GenerateTwinsTaskId used A/B testing
func TwinsTaskID(url string, meta *base.UrlMeta, peerID string) string {
taskID := TaskID(url, meta)

if crc32.ChecksumIEEE([]byte(peerID))&1 == 0 {
taskID += TwinsASuffix
} else {
taskID += TwinsBSuffix
}

return taskID
}
75 changes: 0 additions & 75 deletions internal/idgen/task_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,78 +85,3 @@ func TestTaskID(t *testing.T) {
})
}
}

func TestTwinsTaskID(t *testing.T) {
tests := []struct {
name string
url string
meta *base.UrlMeta
peerID string
expect func(t *testing.T, d interface{})
}{
{
name: "generate taskID with url",
url: "https://example.com",
meta: &base.UrlMeta{},
peerID: "foo",
expect: func(t *testing.T, d interface{}) {
assert := assert.New(t)
assert.Equal("100680ad546ce6a577f42f52df33b4cfdca756859e664b8d7de329b150d09ce9_B", d)
},
},
{
name: "generate taskID with meta",
url: "https://example.com",
meta: &base.UrlMeta{
Range: "foo",
Digest: "bar",
},
peerID: "foo",
expect: func(t *testing.T, d interface{}) {
assert := assert.New(t)
assert.Equal("aeee0e0a2a0c75130582641353c539aaf9011a0088b31347f7588e70e449a3e0_B", d)
},
},
{
name: "generate taskID with filter",
url: "https://example.com?foo=foo&bar=bar",
meta: &base.UrlMeta{
Filter: "foo&bar",
},
peerID: "foo",
expect: func(t *testing.T, d interface{}) {
assert := assert.New(t)
assert.Equal("100680ad546ce6a577f42f52df33b4cfdca756859e664b8d7de329b150d09ce9_B", d)
},
},
{
name: "generate taskID with tag",
url: "https://example.com",
meta: &base.UrlMeta{
Tag: "foo",
},
peerID: "foo",
expect: func(t *testing.T, d interface{}) {
assert := assert.New(t)
assert.Equal("2773851c628744fb7933003195db436ce397c1722920696c4274ff804d86920b_B", d)
},
},
{
name: "generate twinsA taskID",
url: "https://example.com",
meta: &base.UrlMeta{},
peerID: "bar",
expect: func(t *testing.T, d interface{}) {
assert := assert.New(t)
assert.Equal("100680ad546ce6a577f42f52df33b4cfdca756859e664b8d7de329b150d09ce9_A", d)
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
data := TwinsTaskID(tc.url, tc.meta, tc.peerID)
tc.expect(t, data)
})
}
}
23 changes: 23 additions & 0 deletions pkg/util/mathutils/math_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@
package mathutils

import (
"math"
"strconv"
)

const float64EqualityThreshold = 1e-9

func MaxInt(a, b int) int {
if a < b {
return a
}

return b
}

func MaxInt32(a, b int32) int32 {
if a > b {
return a
Expand All @@ -29,6 +40,14 @@ func MaxInt32(a, b int32) int32 {
return b
}

func MinInt(a, b int) int {
if a < b {
return a
}

return b
}

func MinInt32(a, b int32) int32 {
if a < b {
return a
Expand Down Expand Up @@ -68,3 +87,7 @@ func IsInteger(value string) bool {

return false
}

func EqualFloat64(a, b float64) bool {
return math.Abs(a-b) <= float64EqualityThreshold
}
8 changes: 2 additions & 6 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ type Config struct {
func New() *Config {
return &Config{
Scheduler: &SchedulerConfig{
ABTest: false,
AEvaluator: "",
BEvaluator: "",
Algorithm: "default",
WorkerNum: runtime.GOMAXPROCS(0),
BackSourceCount: 3,
AccessWindow: 3 * time.Minute,
Expand Down Expand Up @@ -179,9 +177,7 @@ type DynConfig struct {
}

type SchedulerConfig struct {
ABTest bool `yaml:"abtest" mapstructure:"abtest"`
AEvaluator string `yaml:"aevaluator" mapstructure:"aevaluator"`
BEvaluator string `yaml:"bevaluator" mapstructure:"bevaluator"`
Algorithm string `yaml:"algorithm" mapstructure:"algorithm"`
WorkerNum int `yaml:"workerNum" mapstructure:"workerNum"`
BackSourceCount int32 `yaml:"backSourceCount" mapstructure:"backSourceCount"`
// AccessWindow should less than CDN task expireTime
Expand Down
6 changes: 2 additions & 4 deletions scheduler/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ func TestSchedulerConfig_Load(t *testing.T) {
CDNDirPath: "tmp",
},
Scheduler: &SchedulerConfig{
ABTest: true,
AEvaluator: "a-evaluator",
BEvaluator: "b-evaluator",
WorkerNum: 8,
Algorithm: "default",
WorkerNum: 8,
},
Server: &ServerConfig{
IP: "127.0.0.1",
Expand Down
4 changes: 1 addition & 3 deletions scheduler/config/testdata/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ dynconfig:
cdnDirPath: tmp

scheduler:
abtest: true
aevaluator: "a-evaluator"
bevaluator: "b-evaluator"
workerNum: 8
workerJobPoolSize: 10000
senderNum: 10
senderJobPoolSize: 10000
algorithm: default

server:
ip: "127.0.0.1"
Expand Down
Loading

0 comments on commit d01eef6

Please sign in to comment.