Skip to content

Commit

Permalink
Merge branch 'master' into hot_cpu
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored Jan 9, 2023
2 parents e5e7edf + 3fba508 commit 966495e
Show file tree
Hide file tree
Showing 59 changed files with 1,171 additions and 265 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26 h1:Tw4afZ2Tyr8iT8Oln6/szMjh5IDs+GtlnLsDo/Y2HEE=
github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1 h1:jw4NjEiCleRJPPpHM7K6l8OKzOjnZAj62eKteCAY6ro=
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package msc used to implement the core logic of the external services which rely on the PD banckend provider.
// Package msc used to implement the core logic of the external services which rely on the PD backend provider.
package msc
4 changes: 2 additions & 2 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type RegistrableService interface {
}

// ServiceRegistry is a map that stores all registered grpc services.
// It implements the `Serviceregistry` interface.
// It implements the `ServiceRegistry` interface.
type ServiceRegistry struct {
builders map[string]ServiceBuilder
services map[string]RegistrableService
Expand Down Expand Up @@ -90,7 +90,7 @@ func (r ServiceRegistry) RegisterService(name string, service ServiceBuilder) {
}

func init() {
server.NewServiceregistry = func() server.Serviceregistry {
server.NewServiceRegistry = func() server.ServiceRegistry {
return ServerServiceRegistry
}
}
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewService(srv *rmserver.Service) *Service {
func (s *Service) RegisterRouter() {
configEndpoint := s.baseEndpoint.Group("/config")
configEndpoint.POST("/group", s.postResourceGroup)
configEndpoint.PUT("group", s.putResourceGroup)
configEndpoint.PUT("/group", s.putResourceGroup)
configEndpoint.GET("/group/:name", s.getResourceGroup)
configEndpoint.GET("/groups", s.getResourceGroupList)
configEndpoint.DELETE("/group/:name", s.deleteResourceGroup)
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (s *Service) RegisterGRPCService(g *grpc.Server) {

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
hander, group := SetUpRestHandler(s)
server.RegisterUserDefinedHandlers(userDefineHandlers, &group, hander)
handler, group := SetUpRestHandler(s)
server.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

// GetManager returns the resource manager.
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/tikv/pd/pkg/mcs/registry"
rm_server "github.com/tikv/pd/pkg/mcs/resource_manager/server"

// init API gorup
// init API group
_ "github.com/tikv/pd/pkg/mcs/resource_manager/server/apis/v1"
)

Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (m *Manager) AddResourceGroup(group *ResourceGroup) error {
return err
}
m.Lock()
if err := m.storage().SaveResourceGroup(group.Name, group); err != nil {
if err := group.persistSettings(m.storage()); err != nil {
return err
}
m.groups[group.Name] = group
Expand All @@ -87,11 +87,11 @@ func (m *Manager) ModifyResourceGroup(group *rmpb.ResourceGroup) error {
return errors.New("not exists the group")
}
newGroup := curGroup.Copy()
err := newGroup.PatchSettings(group.GetSettings())
err := newGroup.PatchSettings(group)
if err != nil {
return err
}
if m.storage().SaveResourceGroup(group.Name, newGroup); err != nil {
if err := newGroup.persistSettings(m.storage()); err != nil {
return err
}
m.groups[group.Name] = newGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@ package server

import (
"encoding/json"
"path"
"sync"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/tikv/pd/server/storage"
"go.uber.org/zap"
)

const (
// groupSettingsPathPrefix is the prefix of the resource group path to store group settings.
groupSettingsPathPrefix = "/settings"
)

// ResourceGroup is the definition of a resource group, for REST API.
type ResourceGroup struct {
sync.RWMutex
Expand Down Expand Up @@ -81,7 +88,7 @@ func (rg *ResourceGroup) CheckAndInit() error {
if len(rg.Name) == 0 || len(rg.Name) > 32 {
return errors.New("invalid resource group name, the length should be in [1,32]")
}
if rg.Mode != rmpb.GroupMode_RUMode && rg.Mode != rmpb.GroupMode_NativeMode {
if rg.Mode != rmpb.GroupMode_RUMode && rg.Mode != rmpb.GroupMode_RawMode {
return errors.New("invalid resource group mode")
}
if rg.Mode == rmpb.GroupMode_RUMode {
Expand All @@ -92,12 +99,12 @@ func (rg *ResourceGroup) CheckAndInit() error {
return errors.New("invalid resource group settings, RU mode should not set resource settings")
}
}
if rg.Mode == rmpb.GroupMode_NativeMode {
if rg.Mode == rmpb.GroupMode_RawMode {
if rg.ResourceSettings == nil {
rg.ResourceSettings = &NativeResourceSettings{}
}
if rg.RUSettings != nil {
return errors.New("invalid resource group settings, native mode should not set RU settings")
return errors.New("invalid resource group settings, raw mode should not set RU settings")
}
}
return nil
Expand All @@ -106,26 +113,26 @@ func (rg *ResourceGroup) CheckAndInit() error {
// PatchSettings patches the resource group settings.
// Only used to patch the resource group when updating.
// Note: the tokens is the delta value to patch.
func (rg *ResourceGroup) PatchSettings(groupSettings *rmpb.GroupSettings) error {
func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error {
rg.Lock()
defer rg.Unlock()
if groupSettings.GetMode() != rg.Mode {
if metaGroup.GetMode() != rg.Mode {
return errors.New("only support reconfigure in same mode, maybe you should delete and create a new one")
}
switch rg.Mode {
case rmpb.GroupMode_RUMode:
if groupSettings.GetRUSettings() == nil {
if metaGroup.GetRUSettings() == nil {
return errors.New("invalid resource group settings, RU mode should set RU settings")
}
rg.RUSettings.RRU.patch(groupSettings.GetRUSettings().GetRRU())
rg.RUSettings.WRU.patch(groupSettings.GetRUSettings().GetWRU())
case rmpb.GroupMode_NativeMode:
if groupSettings.GetResourceSettings() == nil {
return errors.New("invalid resource group settings, native mode should set resource settings")
rg.RUSettings.RRU.patch(metaGroup.GetRUSettings().GetRRU())
rg.RUSettings.WRU.patch(metaGroup.GetRUSettings().GetWRU())
case rmpb.GroupMode_RawMode:
if metaGroup.GetResourceSettings() == nil {
return errors.New("invalid resource group settings, raw mode should set resource settings")
}
rg.ResourceSettings.CPU.patch(groupSettings.GetResourceSettings().GetCpu())
rg.ResourceSettings.IOReadBandwidth.patch(groupSettings.GetResourceSettings().GetIoRead())
rg.ResourceSettings.IOWriteBandwidth.patch(groupSettings.GetResourceSettings().GetIoWrite())
rg.ResourceSettings.CPU.patch(metaGroup.GetResourceSettings().GetCpu())
rg.ResourceSettings.IOReadBandwidth.patch(metaGroup.GetResourceSettings().GetIoRead())
rg.ResourceSettings.IOWriteBandwidth.patch(metaGroup.GetResourceSettings().GetIoWrite())
}
log.Info("patch resource group settings", zap.String("name", rg.Name), zap.String("settings", rg.String()))
return nil
Expand All @@ -140,11 +147,11 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {

rg := &ResourceGroup{
Name: group.Name,
Mode: group.Settings.Mode,
Mode: group.Mode,
}
switch group.GetSettings().GetMode() {
switch group.GetMode() {
case rmpb.GroupMode_RUMode:
if settings := group.GetSettings().GetRUSettings(); settings != nil {
if settings := group.GetRUSettings(); settings != nil {
ruSettings = &RequestUnitSettings{
RRU: GroupTokenBucket{
TokenBucket: settings.GetRRU(),
Expand All @@ -155,8 +162,8 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
}
rg.RUSettings = ruSettings
}
case rmpb.GroupMode_NativeMode:
if settings := group.GetSettings().GetResourceSettings(); settings != nil {
case rmpb.GroupMode_RawMode:
if settings := group.GetResourceSettings(); settings != nil {
resourceSettings = &NativeResourceSettings{
CPU: GroupTokenBucket{
TokenBucket: settings.GetCpu(),
Expand All @@ -182,28 +189,31 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
case rmpb.GroupMode_RUMode: // RU mode
group := &rmpb.ResourceGroup{
Name: rg.Name,
Settings: &rmpb.GroupSettings{
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RRU: rg.RUSettings.RRU.TokenBucket,
WRU: rg.RUSettings.WRU.TokenBucket,
},
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RRU: rg.RUSettings.RRU.TokenBucket,
WRU: rg.RUSettings.WRU.TokenBucket,
},
}
return group
case rmpb.GroupMode_NativeMode: // Native mode
case rmpb.GroupMode_RawMode: // Raw mode
group := &rmpb.ResourceGroup{
Name: rg.Name,
Settings: &rmpb.GroupSettings{
Mode: rmpb.GroupMode_NativeMode,
ResourceSettings: &rmpb.GroupResourceSettings{
Cpu: rg.ResourceSettings.CPU.TokenBucket,
IoRead: rg.ResourceSettings.IOReadBandwidth.TokenBucket,
IoWrite: rg.ResourceSettings.IOWriteBandwidth.TokenBucket,
},
Mode: rmpb.GroupMode_RawMode,
ResourceSettings: &rmpb.GroupResourceSettings{
Cpu: rg.ResourceSettings.CPU.TokenBucket,
IoRead: rg.ResourceSettings.IOReadBandwidth.TokenBucket,
IoWrite: rg.ResourceSettings.IOWriteBandwidth.TokenBucket,
},
}
return group
}
return nil
}

// persistSettings persists the resource group settings.
// TODO: persist the state of the group separately.
func (rg *ResourceGroup) persistSettings(storage storage.Storage) error {
metaGroup := rg.IntoProtoResourceGroup()
return storage.SaveResourceGroup(path.Join(groupSettingsPathPrefix, rg.Name), metaGroup)
}
70 changes: 70 additions & 0 deletions pkg/mcs/resource_manager/server/resource_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package server

import (
"encoding/json"
"testing"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
)

func TestPatchResourceGroup(t *testing.T) {
re := require.New(t)
rg1 := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RUMode}
err := rg1.CheckAndInit()
re.NoError(err)
testCaseRU := []struct {
patchJSONString string
expectJSONString string
}{
{`{"name":"test", "mode":1, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"w_r_u":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"w_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}, "w_r_u":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
}

for _, ca := range testCaseRU {
rg := rg1.Copy()
patch := &rmpb.ResourceGroup{}
err := json.Unmarshal([]byte(ca.patchJSONString), patch)
re.NoError(err)
err = rg.PatchSettings(patch)
re.NoError(err)
res, err := json.Marshal(rg)
re.NoError(err)
re.Equal(ca.expectJSONString, string(res))
}

rg2 := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RawMode}
err = rg2.CheckAndInit()
re.NoError(err)
testCaseResource := []struct {
patchJSONString string
expectJSONString string
}{
{`{"name":"test", "mode":2, "resource_settings": {"cpu":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":2,"resource_settings":{"cpu":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"initialized":false}}}`},
{`{"name":"test", "mode":2, "resource_settings": {"io_read":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":2,"resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"io_write_bandwidth":{"initialized":false}}}`},
{`{"name":"test", "mode":2, "resource_settings": {"io_write":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":2,"resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
}

for _, ca := range testCaseResource {
rg := rg2.Copy()
patch := &rmpb.ResourceGroup{}
err := json.Unmarshal([]byte(ca.patchJSONString), patch)
re.NoError(err)
err = rg.PatchSettings(patch)
re.NoError(err)
res, err := json.Marshal(rg)
re.NoError(err)
re.Equal(ca.expectJSONString, string(res))
}
}
10 changes: 7 additions & 3 deletions pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) {
// Update updates the token bucket.
func (t *GroupTokenBucket) Update(now time.Time) {
if !t.Initialized {
t.Settings.Fillrate = defaultRefillRate
t.Tokens = defaultInitialTokens
if t.Settings.FillRate == 0 {
t.Settings.FillRate = defaultRefillRate
}
if t.Tokens < defaultInitialTokens {
t.Tokens = defaultInitialTokens
}
t.LastUpdate = &now
t.Initialized = true
return
}

delta := now.Sub(*t.LastUpdate)
if delta > 0 {
t.Tokens += float64(t.Settings.Fillrate) * delta.Seconds()
t.Tokens += float64(t.Settings.FillRate) * delta.Seconds()
t.LastUpdate = &now
}
}
Expand Down
Loading

0 comments on commit 966495e

Please sign in to comment.