diff --git a/go.mod b/go.mod index 21142d56f5e1..56ca011073e1 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,6 @@ require ( github.com/gorilla/mux v1.7.3 github.com/gorilla/websocket v1.2.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 - github.com/grpc-ecosystem/grpc-gateway v1.12.1 github.com/juju/ratelimit v1.0.1 github.com/mattn/go-shellwords v1.0.3 github.com/montanaflynn/stats v0.0.0-20151014174947-eeaced052adb @@ -52,5 +51,3 @@ require ( google.golang.org/grpc v1.25.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) - -replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20200108095827-b74c3984daa5 diff --git a/go.sum b/go.sum index c064aa0f1f0c..8f35a008e457 100644 --- a/go.sum +++ b/go.sum @@ -10,7 +10,6 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= @@ -105,6 +104,7 @@ github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg= github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -131,8 +131,6 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= -github.com/grpc-ecosystem/grpc-gateway v1.12.1 h1:zCy2xE9ablevUOrUZc3Dl72Dt+ya2FNAvC2yLYMHzi4= -github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= @@ -216,6 +214,9 @@ github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0h github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= +github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191213111810-93cb7c623c8b h1:gYXdnlC+ipZzLt9FrGc4iDlCtWvWCncJBVULiJQHzYg= +github.com/pingcap/kvproto v0.0.0-20191213111810-93cb7c623c8b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/sysutil v0.0.0-20191216090214-5f9620d22b3b h1:EEyo/SCRswLGuSk+7SB86Ak1p8bS6HL1Mi4Dhyuv6zg= @@ -238,10 +239,7 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/rleungx/kvproto v0.0.0-20200108095827-b74c3984daa5 h1:PyVrXq/3zjxA30YPoK8W14y8TI/M/SlU4YoTL9DTmvw= -github.com/rleungx/kvproto v0.0.0-20200108095827-b74c3984daa5/go.mod h1:74hTtf0KOCW1kGA7+XpuHDpbZxmlPM8z3VQTpTalS0E= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= -github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.19.10+incompatible h1:lA4Pi29JEVIQIgATSeftHSY0rMGI9CLrl2ZvDLiahto= @@ -349,7 +347,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 h1:fHDIZ2oxGnUZRN6WgWFCbYBjH9uqVPRCUVUDhs0wnbA= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 h1:efeOvDhwQ29Dj3SdAV/MJf8oukgn+8D8WgaCaRMchF8= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -407,15 +404,14 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c h1:hrpEMCZ2O7DR5gC1n2AJGVhrwiEjOi35+jxtIuZpTMo= -google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= +google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1 h1:q4XQuHFC6I28BKZpo6IYyb3mNO+l7lSOxRuYTCiDfXk= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= gopkg.in/alecthomas/gometalinter.v2 v2.0.12/go.mod h1:NDRytsqEZyolNuAgTzJkZMkSQM7FIKyzVzGhjB/qfYo= @@ -444,7 +440,6 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/keyvisual/service.go b/pkg/keyvisual/service.go index 47edaf9166bc..ebc1d1acd8df 100644 --- a/pkg/keyvisual/service.go +++ b/pkg/keyvisual/service.go @@ -46,7 +46,7 @@ type keyvisualService struct { } // NewKeyvisualService creates a HTTP handler for heatmap service. -func NewKeyvisualService(ctx context.Context, svr *server.Server) (http.Handler, server.APIGroup, func()) { +func NewKeyvisualService(ctx context.Context, svr *server.Server) (http.Handler, server.APIGroup) { mux := http.NewServeMux() k := &keyvisualService{ ServeMux: mux, @@ -62,7 +62,7 @@ func NewKeyvisualService(ctx context.Context, svr *server.Server) (http.Handler, negroni.Wrap(k), ) go k.run() - return handler, defaultRegisterAPIGroupInfo, nil + return handler, defaultRegisterAPIGroupInfo } // Heatmap returns the heatmap data. diff --git a/server/api/config.go b/server/api/config.go index b94999f8abb7..162c4b4c3b76 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -20,6 +20,7 @@ import ( "net/http" "reflect" + "github.com/BurntSushi/toml" "github.com/pingcap/errcode" "github.com/pingcap/kvproto/pkg/configpb" "github.com/pingcap/log" @@ -48,6 +49,27 @@ func (h *confHandler) Get(w http.ResponseWriter, r *http.Request) { } func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + m := make(map[string]interface{}) + json.NewDecoder(r.Body).Decode(&m) + entries, err := transToEntries(m) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + err = redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } config := h.svr.GetConfig() data, err := ioutil.ReadAll(r.Body) r.Body.Close() @@ -137,6 +159,27 @@ func (h *confHandler) GetSchedule(w http.ResponseWriter, r *http.Request) { } func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) { + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + m := make(map[string]interface{}) + json.NewDecoder(r.Body).Decode(&m) + entries, err := transToEntries(m) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + err = redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } config := h.svr.GetScheduleConfig() if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil { return @@ -154,6 +197,27 @@ func (h *confHandler) GetReplication(w http.ResponseWriter, r *http.Request) { } func (h *confHandler) SetReplication(w http.ResponseWriter, r *http.Request) { + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + m := make(map[string]interface{}) + json.NewDecoder(r.Body).Decode(&m) + entries, err := transToEntries(m) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + err = redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } config := h.svr.GetReplicationConfig() if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil { return @@ -175,6 +239,53 @@ func (h *confHandler) SetLabelProperty(w http.ResponseWriter, r *http.Request) { if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } + + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + typ := input["type"] + labelKey, labelValue := input["label-key"], input["label-value"] + cfg := h.svr.GetScheduleOption().LoadLabelPropertyConfig().Clone() + switch input["action"] { + case "set": + for _, l := range cfg[typ] { + if l.Key == labelKey && l.Value == labelValue { + return + } + } + cfg[typ] = append(cfg[typ], config.StoreLabel{Key: labelKey, Value: labelValue}) + case "delete": + oldLabels := cfg[typ] + cfg[typ] = []config.StoreLabel{} + for _, l := range oldLabels { + if l.Key == labelKey && l.Value == labelValue { + continue + } + cfg[typ] = append(cfg[typ], l) + } + if len(cfg[typ]) == 0 { + delete(cfg, typ) + } + default: + err := errors.Errorf("unknown action %v", input["action"]) + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + var buf bytes.Buffer + toml.NewEncoder(&buf).Encode(cfg) + entries := []*entry{{key: "label-property", value: buf.String()}} + err := redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } + var err error switch input["action"] { case "set": @@ -205,17 +316,27 @@ func (h *confHandler) SetClusterVersion(w http.ResponseWriter, r *http.Request) apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(errors.New("not set cluster-version"))) return } + + if h.svr.GetConfig().EnableConfigManager { + kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}} + v := &configpb.Version{Global: h.svr.GetConfigManager().GlobalCfgs[server.Component].GetVersion()} + entry := &configpb.ConfigEntry{Name: "cluster-version", Value: version} + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + _, _, err := h.svr.GetConfigClient().Update(h.svr.Context(), v, kind, []*configpb.ConfigEntry{entry}) + if err != nil { + log.Error("update cluster version meet error", zap.Error(err)) + } + h.rd.JSON(w, http.StatusOK, nil) + return + } + err := h.svr.SetClusterVersion(version) if err != nil { apiutil.ErrorResp(h.rd, w, errcode.NewInternalErr(err)) return } - kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}} - v := &configpb.Version{Global: h.svr.GetConfigManager().GlobalCfgs[server.Component].GetVersion()} - entry := &configpb.ConfigEntry{Name: "cluster-version", Value: version} - _, _, err = h.svr.GetConfigClient().Update(h.svr.Context(), v, kind, []*configpb.ConfigEntry{entry}) - if err != nil { - log.Error("update cluster version meet error", zap.Error(err)) - } h.rd.JSON(w, http.StatusOK, nil) } diff --git a/server/api/config_test.go b/server/api/config_test.go index e7786f5e1c57..cbd0448ff470 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -32,11 +32,14 @@ type testConfigSuite struct { } func (s *testConfigSuite) SetUpSuite(c *C) { + server.ConfigCheckInterval = 10 * time.Millisecond s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.EnableConfigManager = true }) mustWaitLeader(c, []*server.Server{s.svr}) addr := s.svr.GetAddr() s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) + // make sure the config client is initialized + time.Sleep(20 * time.Millisecond) } func (s *testConfigSuite) TearDownSuite(c *C) { @@ -71,7 +74,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) newCfg := &config.Config{} err = readJSON(addr, newCfg) c.Assert(err, IsNil) @@ -93,7 +96,7 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) sc1 := &config.ScheduleConfig{} c.Assert(readJSON(addr, sc1), IsNil) c.Assert(*sc, DeepEquals, *sc1) @@ -120,7 +123,7 @@ func (s *testConfigSuite) TestConfigReplication(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) rc3 := &config.ReplicationConfig{} err = readJSON(addr, rc3) c.Assert(err, IsNil) @@ -149,7 +152,9 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) { for _, cmd := range cmds { err := postJSON(addr, []byte(cmd)) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) } + cfg = loadProperties() c.Assert(cfg, HasLen, 2) c.Assert(cfg["foo"], DeepEquals, []config.StoreLabel{ @@ -165,7 +170,9 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) { for _, cmd := range cmds { err := postJSON(addr, []byte(cmd)) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) } + cfg = loadProperties() c.Assert(cfg, HasLen, 1) c.Assert(cfg["foo"], DeepEquals, []config.StoreLabel{{Key: "zone", Value: "cn2"}}) diff --git a/server/api/label_test.go b/server/api/label_test.go index 0bf09db3888e..8a66cc9ca2d3 100644 --- a/server/api/label_test.go +++ b/server/api/label_test.go @@ -108,6 +108,7 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) { }, } + server.ConfigCheckInterval = 10 * time.Millisecond s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.Replication.StrictlyMatchLabel = false cfg.EnableConfigManager = true @@ -121,6 +122,8 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) { for _, store := range s.stores { mustPutStore(c, s.svr, store.Id, store.State, store.Labels) } + // make sure the config client is initialized + time.Sleep(20 * time.Millisecond) } func (s *testLabelsStoreSuite) TearDownSuite(c *C) { @@ -187,6 +190,7 @@ type testStrictlyLabelsStoreSuite struct { } func (s *testStrictlyLabelsStoreSuite) SetUpSuite(c *C) { + server.ConfigCheckInterval = 10 * time.Millisecond s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.Replication.LocationLabels = []string{"zone", "disk"} cfg.Replication.StrictlyMatchLabel = true @@ -198,6 +202,8 @@ func (s *testStrictlyLabelsStoreSuite) SetUpSuite(c *C) { s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) mustBootstrapCluster(c, s.svr) + // make sure the config client is initialized + time.Sleep(20 * time.Millisecond) } func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { @@ -282,7 +288,7 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { // enable placement rules. Report no error any more. c.Assert(postJSON(fmt.Sprintf("%s/config", s.urlPrefix), []byte(`{"enable-placement-rules":"true"}`)), IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) for _, t := range cases { _, err := s.svr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, diff --git a/server/api/log.go b/server/api/log.go index a2053589e0a0..e5a2094738c8 100644 --- a/server/api/log.go +++ b/server/api/log.go @@ -15,6 +15,7 @@ package api import ( "encoding/json" + "fmt" "io/ioutil" "net/http" @@ -37,6 +38,23 @@ func newlogHandler(svr *server.Server, rd *render.Render) *logHandler { } func (h *logHandler) Handle(w http.ResponseWriter, r *http.Request) { + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + var str string + json.NewDecoder(r.Body).Decode(&str) + entries := []*entry{{key: "log.level", value: fmt.Sprintf("level = \"%v\"", str)}} + err := redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } var level string data, err := ioutil.ReadAll(r.Body) r.Body.Close() diff --git a/server/api/middleware.go b/server/api/middleware.go index 17ef1c3d1e40..a774e7f15171 100644 --- a/server/api/middleware.go +++ b/server/api/middleware.go @@ -15,16 +15,11 @@ package api import ( "bytes" - "encoding/json" - "fmt" - "io/ioutil" "net/http" "reflect" "strings" "github.com/BurntSushi/toml" - "github.com/golang/protobuf/jsonpb" - "github.com/pingcap/kvproto/pkg/configpb" "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/cluster" "github.com/pingcap/pd/server/config" @@ -56,109 +51,32 @@ func (m clusterMiddleware) Middleware(h http.Handler) http.Handler { }) } -type configMiddleware struct { - s *server.Server - rd *render.Render -} - -func newConfigMiddleware(s *server.Server) configMiddleware { - return configMiddleware{ - s: s, - rd: render.New(render.Options{IndentJSON: true}), - } -} - type entry struct { key string value string } -func (m configMiddleware) Middleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - req := make(map[string]interface{}) - json.NewDecoder(r.Body).Decode(&req) - mapKeys := reflect.ValueOf(req).MapKeys() - var entries []*entry - for _, k := range mapKeys { - if config.IsDeprecated(k.String()) { - m.rd.JSON(w, http.StatusInternalServerError, errors.New("config item has already been deprecated").Error()) - return - } - itemMap := make(map[string]interface{}) - itemMap[k.String()] = req[k.String()] - var buf bytes.Buffer - if err := toml.NewEncoder(&buf).Encode(itemMap); err != nil { - m.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - value := buf.String() - key := findTag(reflect.TypeOf(&config.Config{}).Elem(), k.String()) - if key == "" { - m.rd.JSON(w, http.StatusInternalServerError, errors.New("config item not found").Error()) - return - } - entries = append(entries, &entry{key, value}) +func transToEntries(req map[string]interface{}) ([]*entry, error) { + mapKeys := reflect.ValueOf(req).MapKeys() + var entries []*entry + for _, k := range mapKeys { + if config.IsDeprecated(k.String()) { + return nil, errors.New("config item has already been deprecated") } - - s, err := newBody(m.s, entries) - if err != nil { - m.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + itemMap := make(map[string]interface{}) + itemMap[k.String()] = req[k.String()] + var buf bytes.Buffer + if err := toml.NewEncoder(&buf).Encode(itemMap); err != nil { + return nil, err } - r.Body = ioutil.NopCloser(strings.NewReader(s)) - next.ServeHTTP(w, r) - }) -} - -type adminMiddleware struct { - s *server.Server - rd *render.Render -} - -func newAdminMiddleware(s *server.Server) adminMiddleware { - return adminMiddleware{ - s: s, - rd: render.New(render.Options{IndentJSON: true}), - } -} - -func (m adminMiddleware) Middleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var req string - json.NewDecoder(r.Body).Decode(&req) - entries := []*entry{{key: "log.level", value: fmt.Sprintf("level = \"%v\"", req)}} - s, err := newBody(m.s, entries) - if err != nil { - m.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + value := buf.String() + key := findTag(reflect.TypeOf(&config.Config{}).Elem(), k.String()) + if key == "" { + return nil, errors.New("config item not found") } - r.Body = ioutil.NopCloser(strings.NewReader(s)) - next.ServeHTTP(w, r) - }) -} - -func newBody(s *server.Server, entries []*entry) (string, error) { - clusterID := s.ClusterID() - var configEntries []*configpb.ConfigEntry - for _, e := range entries { - configEntry := &configpb.ConfigEntry{Name: e.key, Value: e.value} - configEntries = append(configEntries, configEntry) - } - version := s.GetConfigManager().GlobalCfgs[server.Component].GetVersion() - - req := &configpb.UpdateRequest{ - Header: &configpb.Header{ - ClusterId: clusterID, - }, - Version: &configpb.Version{Global: version}, - Kind: &configpb.ConfigKind{ - Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}, - }, - Entries: configEntries, + entries = append(entries, &entry{key, value}) } - - m := jsonpb.Marshaler{} - return m.MarshalToString(req) + return entries, nil } func findTag(t reflect.Type, tag string) string { diff --git a/server/api/router.go b/server/api/router.go index 43b231ce7d9f..32e1a32201ed 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -14,19 +14,12 @@ package api import ( - "context" - "errors" "net/http" "net/http/pprof" "github.com/gorilla/mux" - "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/pingcap/kvproto/pkg/configpb" - "github.com/pingcap/log" "github.com/pingcap/pd/server" "github.com/unrolled/render" - "go.uber.org/zap" - "google.golang.org/grpc" ) func createStreamingRender() *render.Render { @@ -41,7 +34,7 @@ func createIndentRender() *render.Render { }) } -func createRouter(ctx context.Context, prefix string, svr *server.Server) (*mux.Router, func()) { +func createRouter(prefix string, svr *server.Server) *mux.Router { rd := createIndentRender() rootRouter := mux.NewRouter().PathPrefix(prefix).Subrouter() @@ -72,8 +65,11 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) (*mux. confHandler := newConfHandler(svr, rd) apiRouter.HandleFunc("/config", confHandler.Get).Methods("GET") + apiRouter.HandleFunc("/config", confHandler.Post).Methods("POST") apiRouter.HandleFunc("/config/schedule", confHandler.GetSchedule).Methods("GET") + apiRouter.HandleFunc("/config/schedule", confHandler.SetSchedule).Methods("POST") apiRouter.HandleFunc("/config/replicate", confHandler.GetReplication).Methods("GET") + apiRouter.HandleFunc("/config/replicate", confHandler.SetReplication).Methods("POST") apiRouter.HandleFunc("/config/label-property", confHandler.GetLabelProperty).Methods("GET") apiRouter.HandleFunc("/config/label-property", confHandler.SetLabelProperty).Methods("POST") apiRouter.HandleFunc("/config/cluster-version", confHandler.GetClusterVersion).Methods("GET") @@ -164,6 +160,7 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) (*mux. clusterRouter.HandleFunc("/admin/reset-ts", adminHandler.ResetTS).Methods("POST") logHandler := newlogHandler(svr, rd) + apiRouter.HandleFunc("/admin/log", logHandler.Handle).Methods("POST") pluginHandler := newPluginHandler(handler, rd) apiRouter.HandleFunc("/plugin", pluginHandler.LoadPlugin).Methods("POST") @@ -191,40 +188,5 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) (*mux. // Deprecated rootRouter.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET") - if svr.GetConfig().EnableConfigManager { - f := func() { - configRouter := apiRouter.PathPrefix("/config").Methods("POST").Subrouter() - adminRouter := apiRouter.PathPrefix("/admin").Methods("POST").Subrouter() - gwmux := runtime.NewServeMux() - UnaryClientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - invoker(ctx, method, req, reply, cc, opts...) - errMsg := reply.(*configpb.UpdateResponse).GetStatus().GetMessage() - if errMsg != "" { - return errors.New(errMsg) - } - return nil - } - opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithUnaryInterceptor(UnaryClientInterceptor)} - err := configpb.RegisterConfigHandlerFromEndpoint(ctx, gwmux, svr.GetAddr()[7:], opts) - if err != nil { - log.Error("fail to register grpc gateway", zap.Error(err)) - } - - configRouter.Handle("", gwmux).Methods("POST") - configRouter.Handle("/replicate", gwmux).Methods("POST") - configRouter.Handle("/schedule", gwmux).Methods("POST") - - adminRouter.Handle("/log", gwmux).Methods("POST") - - configRouter.Use(newConfigMiddleware(svr).Middleware) - adminRouter.Use(newAdminMiddleware(svr).Middleware) - } - return rootRouter, f - } - apiRouter.HandleFunc("/config", confHandler.Post).Methods("POST") - apiRouter.HandleFunc("/config/schedule", confHandler.SetSchedule).Methods("POST") - apiRouter.HandleFunc("/config/replicate", confHandler.SetReplication).Methods("POST") - - apiRouter.HandleFunc("/admin/log", logHandler.Handle).Methods("POST") - return rootRouter, nil + return rootRouter } diff --git a/server/api/server.go b/server/api/server.go index 92f8dd4cc87f..7a2c792dc193 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -26,18 +26,17 @@ import ( const apiPrefix = "/pd" // NewHandler creates a HTTP handler for API. -func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.APIGroup, func()) { +func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.APIGroup) { group := server.APIGroup{ Name: "core", IsCore: true, } router := mux.NewRouter() - r, f := createRouter(ctx, apiPrefix, svr) router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr), - negroni.Wrap(r)), - ) + negroni.Wrap(createRouter(apiPrefix, svr)), + )) - return router, group, f + return router, group } diff --git a/server/api/store_test.go b/server/api/store_test.go index efcee0587390..24ae6f60f986 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -83,7 +83,7 @@ func (s *testStoreSuite) SetUpSuite(c *C) { Version: "2.0.0", }, } - + server.ConfigCheckInterval = 10 * time.Millisecond s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.EnableConfigManager = true }) mustWaitLeader(c, []*server.Server{s.svr}) @@ -95,6 +95,8 @@ func (s *testStoreSuite) SetUpSuite(c *C) { for _, store := range s.stores { mustPutStore(c, s.svr, store.Id, store.State, nil) } + // make sure the config client is initialized + time.Sleep(20 * time.Millisecond) } func (s *testStoreSuite) TearDownSuite(c *C) { @@ -172,7 +174,7 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { lc, _ := json.Marshal(labelCheck) err = postJSON(s.urlPrefix+"/config", lc) c.Assert(err, IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) // Test set. labels := map[string]string{"zone": "cn", "host": "local"} b, err := json.Marshal(labels) @@ -183,7 +185,7 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { ll, _ := json.Marshal(locationLabels) err = postJSON(s.urlPrefix+"/config", ll) c.Assert(err, IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) err = postJSON(url+"/label", b) c.Assert(err, IsNil) @@ -201,7 +203,7 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { err = postJSON(s.urlPrefix+"/config", lc) c.Assert(err, IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) labels = map[string]string{"zack": "zack1", "Host": "host1"} b, err = json.Marshal(labels) c.Assert(err, IsNil) diff --git a/server/api/util.go b/server/api/util.go index 5c5fd6df8b2e..2964b4a37a18 100644 --- a/server/api/util.go +++ b/server/api/util.go @@ -15,11 +15,16 @@ package api import ( "bytes" + "context" "encoding/json" "io/ioutil" "net/http" "net/url" + "github.com/pingcap/kvproto/pkg/configpb" + pd "github.com/pingcap/pd/client" + "github.com/pingcap/pd/server" + "github.com/pingcap/pd/server/config_manager" "github.com/pkg/errors" ) @@ -115,3 +120,21 @@ func doDelete(url string) (*http.Response, error) { res.Body.Close() return res, nil } + +func redirectUpdateReq(ctx context.Context, client pd.ConfigClient, cm *configmanager.ConfigManager, entries []*entry) error { + var configEntries []*configpb.ConfigEntry + for _, e := range entries { + configEntry := &configpb.ConfigEntry{Name: e.key, Value: e.value} + configEntries = append(configEntries, configEntry) + } + version := &configpb.Version{Global: cm.GlobalCfgs[server.Component].GetVersion()} + kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}} + status, _, err := client.Update(ctx, version, kind, configEntries) + if err != nil { + return err + } + if status.GetCode() != configpb.StatusCode_OK { + return errors.New(status.GetMessage()) + } + return nil +} diff --git a/server/config/config.go b/server/config/config.go index 435f47bfa2a1..1bc00f3b1874 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -165,7 +165,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.Security.KeyPath, "key", "", "Path of file that contains X509 key in PEM format") fs.BoolVar(&cfg.ForceNewCluster, "force-new-cluster", false, "Force to create a new one-member cluster") - fs.BoolVar(&cfg.EnableConfigManager, "enable-config-manager", true, "Enable configuration manager") + fs.BoolVar(&cfg.EnableConfigManager, "enable-config-manager", false, "Enable configuration manager") fs.BoolVar(&cfg.EnableDashboard, "enable-dashboard", true, "Enable Dashboard API and UI on this node") @@ -576,7 +576,7 @@ type ScheduleConfig struct { Schedulers SchedulerConfigs `toml:"schedulers" json:"schedulers-v2"` // json v2 is for the sake of compatible upgrade // Only used to display - SchedulersPayload map[string]string `json:"schedulers,omitempty"` + SchedulersPayload map[string]string `toml:"schedulers-payload" json:"schedulers-payload,omitempty"` // StoreLimitMode can be auto or manual, when set to auto, // PD tries to change the store limit values according to diff --git a/server/config_manager/config_manager.go b/server/config_manager/config_manager.go index 234ffbb38d2b..c7f5a206fd36 100644 --- a/server/config_manager/config_manager.go +++ b/server/config_manager/config_manager.go @@ -95,8 +95,8 @@ func (c *ConfigManager) Reload(storage *core.Storage) error { return err } -// getComponent returns the component from a given component ID. -func (c *ConfigManager) getComponent(id string) string { +// GetComponent returns the component from a given component ID. +func (c *ConfigManager) GetComponent(id string) string { for component, cfgs := range c.LocalCfgs { if _, ok := cfgs[id]; ok { return component @@ -131,7 +131,7 @@ func (c *ConfigManager) GetConfig(version *configpb.Version, component, componen Message: errEncode(err), } } - if versionEqual(cfg.getVersion(), version) { + if versionEqual(cfg.GetVersion(), version) { status = &configpb.Status{Code: configpb.StatusCode_OK} } else { status = &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} @@ -198,7 +198,7 @@ func (c *ConfigManager) CreateConfig(version *configpb.Version, component, compo func (c *ConfigManager) getLatestVersion(component, componentID string) *configpb.Version { v := &configpb.Version{ Global: c.GlobalCfgs[component].GetVersion(), - Local: c.LocalCfgs[component][componentID].getVersion().GetLocal(), + Local: c.LocalCfgs[component][componentID].GetVersion().GetLocal(), } return v } @@ -312,7 +312,7 @@ func mergeAndUpdateConfig(localCfg *LocalConfig, updateEntries map[string]*Entry } func (c *ConfigManager) updateLocal(componentID string, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { - component := c.getComponent(componentID) + component := c.GetComponent(componentID) if component == "" { return &configpb.Version{Global: 0, Local: 0}, &configpb.Status{Code: configpb.StatusCode_COMPONENT_NOT_FOUND} } @@ -324,7 +324,7 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio } } if localCfg, ok := c.LocalCfgs[component][componentID]; ok { - localLatestVersion := localCfg.getVersion() + localLatestVersion := localCfg.GetVersion() if !versionEqual(localLatestVersion, version) { return localLatestVersion, &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} } @@ -339,7 +339,7 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio } else { return version, &configpb.Status{Code: configpb.StatusCode_COMPONENT_ID_NOT_FOUND} } - return c.LocalCfgs[component][componentID].getVersion(), &configpb.Status{Code: configpb.StatusCode_OK} + return c.LocalCfgs[component][componentID].GetVersion(), &configpb.Status{Code: configpb.StatusCode_OK} } func (c *ConfigManager) deleteEntry(component, e string) { @@ -375,12 +375,12 @@ func (c *ConfigManager) deleteGlobal(component string, version *configpb.Version } func (c *ConfigManager) deleteLocal(componentID string, version *configpb.Version) *configpb.Status { - component := c.getComponent(componentID) + component := c.GetComponent(componentID) if component == "" { return &configpb.Status{Code: configpb.StatusCode_COMPONENT_NOT_FOUND} } if localCfg, ok := c.LocalCfgs[component][componentID]; ok { - localLatestVersion := localCfg.getVersion() + localLatestVersion := localCfg.GetVersion() if !versionEqual(localLatestVersion, version) { return &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} } @@ -481,8 +481,8 @@ func (lc *LocalConfig) updateEntry(entry *configpb.ConfigEntry, version *configp entries[entry.GetName()] = NewEntryValue(entry, version) } -// getVersion return the local config version for a component. -func (lc *LocalConfig) getVersion() *configpb.Version { +// GetVersion return the local config version for a component. +func (lc *LocalConfig) GetVersion() *configpb.Version { if lc == nil { return nil } @@ -509,7 +509,7 @@ func update(config map[string]interface{}, configName []string, value string) er if !ok { // TODO: remove it if configName[0] != "schedulers-v2" { - return errors.New("cannot find the config item") + return errors.Errorf("cannot find the config item: %v", configName[0]) } } @@ -527,6 +527,9 @@ func update(config map[string]interface{}, configName []string, value string) er return errors.Errorf("failed to decode value: %v", err.Error()) } container[configName[0]] = value + } else if configName[0] == "label-property" { + config[configName[0]] = container + return nil } v, err := getUpdateValue(config[configName[0]], container[configName[0]]) @@ -541,6 +544,7 @@ func update(config map[string]interface{}, configName []string, value string) er func getUpdateValue(item, updateItem interface{}) (interface{}, error) { var err error var v interface{} + var tmp float64 switch item.(type) { case bool: switch t1 := updateItem.(type) { @@ -554,7 +558,8 @@ func getUpdateValue(item, updateItem interface{}) (interface{}, error) { case int64: switch t1 := updateItem.(type) { case string: - v, err = strconv.ParseInt(updateItem.(string), 10, 64) + tmp, err = strconv.ParseFloat(updateItem.(string), 64) + v = int64(tmp) case float64: v = int64(updateItem.(float64)) case int64: diff --git a/server/config_manager/grpc_service.go b/server/config_manager/grpc_service.go index 9e9bb85f5eff..42c02e933a5f 100644 --- a/server/config_manager/grpc_service.go +++ b/server/config_manager/grpc_service.go @@ -17,7 +17,9 @@ import ( "context" "github.com/pingcap/kvproto/pkg/configpb" + "github.com/pingcap/log" "github.com/pkg/errors" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -32,6 +34,7 @@ func (c *ConfigManager) Create(ctx context.Context, request *configpb.CreateRequ version, config, status := c.CreateConfig(request.GetVersion(), request.GetComponent(), request.GetComponentId(), request.GetConfig()) if status.GetCode() == configpb.StatusCode_OK { + log.Info("component has registered", zap.String("component", request.GetComponent()), zap.String("component-id", request.GetComponentId())) c.Persist(c.svr.GetStorage()) } diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index edba9e66d163..28c8a3047a68 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -292,6 +292,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS h.pendingOpInfos[regionID].PushBack(influence) schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc() } + func (h *hotScheduler) getPendingInfluence(regionID uint64) *pendingInfluence { if l, ok := h.pendingOpInfos[regionID]; ok { if l.Len() != 0 { diff --git a/server/server.go b/server/server.go index 413cb9507ace..b48670186399 100644 --- a/server/server.go +++ b/server/server.go @@ -144,7 +144,7 @@ type Server struct { } // HandlerBuilder builds a server HTTP handler. -type HandlerBuilder func(context.Context, *Server) (http.Handler, APIGroup, func()) +type HandlerBuilder func(context.Context, *Server) (http.Handler, APIGroup) // APIGroup used to register the api service. type APIGroup struct { @@ -160,31 +160,14 @@ const ( ExtensionsPath = "/pd/apis" ) -type lazyHandler struct { - options []func() - engine *negroni.Negroni -} - -func (lazy *lazyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - for _, f := range lazy.options { - f() - } - - lazy.engine.ServeHTTP(w, r) -} - -func combineBuilderServerHTTPService(ctx context.Context, svr *Server, apiBuilders ...HandlerBuilder) (http.Handler, error) { +func combineBuilderServerHTTPService(svr *Server, apiBuilders ...HandlerBuilder) (http.Handler, error) { engine := negroni.New() recovery := negroni.NewRecovery() engine.Use(recovery) router := mux.NewRouter() registerMap := make(map[string]struct{}) - var options []func() for _, build := range apiBuilders { - handler, info, f := build(ctx, svr) - if f != nil { - options = append(options, f) - } + handler, info := build(svr.ctx, svr) var pathPrefix string if info.IsCore { pathPrefix = CorePath @@ -210,11 +193,7 @@ func combineBuilderServerHTTPService(ctx context.Context, svr *Server, apiBuilde } } engine.UseHandler(router) - - return &lazyHandler{ - engine: engine, - options: options, - }, nil + return engine, nil } // CreateServer creates the UNINITIALIZED pd server with given configuration. @@ -239,7 +218,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, apiBuilders ...Handle return nil, err } if len(apiBuilders) != 0 { - apiHandler, err := combineBuilderServerHTTPService(ctx, s, apiBuilders...) + apiHandler, err := combineBuilderServerHTTPService(s, apiBuilders...) if err != nil { return nil, err } @@ -845,16 +824,6 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { zap.Error(err)) return err } - var buf bytes.Buffer - cfg := s.scheduleOpt.LoadLabelPropertyConfig() - toml.NewEncoder(&buf).Encode(cfg) - - if s.GetConfig().EnableConfigManager { - status := s.updateConfigManager("label-property", buf.String()) - if status.GetCode() != configpb.StatusCode_OK { - log.Error("failed to update the label property", zap.Error(errors.New(status.GetMessage()))) - } - } log.Info("label property config is updated", zap.Reflect("config", s.scheduleOpt.LoadLabelPropertyConfig())) return nil @@ -874,16 +843,6 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { zap.Error(err)) return err } - var buf bytes.Buffer - cfg := s.scheduleOpt.LoadLabelPropertyConfig() - toml.NewEncoder(&buf).Encode(cfg) - - if s.GetConfig().EnableConfigManager { - status := s.updateConfigManager("label-property", buf.String()) - if status.GetCode() != configpb.StatusCode_OK { - log.Error("failed to delete the label property", zap.Error(errors.New(status.GetMessage()))) - } - } log.Info("label property config is deleted", zap.Reflect("config", s.scheduleOpt.LoadLabelPropertyConfig())) return nil @@ -894,6 +853,8 @@ func (s *Server) updateConfigManager(name, value string) *configpb.Status { globalVersion := configManager.GetGlobalConfigs(Component).GetVersion() version := &configpb.Version{Global: globalVersion} entries := []*configpb.ConfigEntry{{Name: name, Value: value}} + configManager.Lock() + defer configManager.Unlock() _, status := configManager.UpdateGlobal(Component, version, entries) return status } @@ -1190,7 +1151,7 @@ func (s *Server) configCheckLoop() { return case <-ticker.C: version := s.GetConfigVersion() - config, err := s.getComponentConfig(ctx, version, addr) + config, err := s.getComponentConfig(ctx, version, compoenntID) if err != nil { log.Error("failed to get config", zap.Error(err)) } diff --git a/server/server_test.go b/server/server_test.go index 10193a7b547e..87e54e6ebc68 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -190,7 +190,7 @@ var _ = Suite(&testServerHandlerSuite{}) type testServerHandlerSuite struct{} func (s *testServerHandlerSuite) TestRegisterServerHandler(c *C) { - mokHandler := func(ctx context.Context, s *Server) (http.Handler, APIGroup, func()) { + mokHandler := func(ctx context.Context, s *Server) (http.Handler, APIGroup) { mux := http.NewServeMux() mux.HandleFunc("/pd/apis/mok/v1/hello", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "Hello World") @@ -199,9 +199,8 @@ func (s *testServerHandlerSuite) TestRegisterServerHandler(c *C) { Name: "mok", Version: "v1", } - return mux, info, nil + return mux, info } - cfg := NewTestSingleConfig(c) ctx, cancel := context.WithCancel(context.Background()) svr, err := CreateServer(ctx, cfg, mokHandler) diff --git a/tests/cluster.go b/tests/cluster.go index 7c60cf536638..f159abe39bcf 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -433,6 +433,7 @@ func (c *TestCluster) WaitLeader() string { } for name, num := range counter { if num == running && c.GetServer(name).IsLeader() { + time.Sleep(20 * time.Millisecond) return name } } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 7883162662be..0ce88c5a4ebe 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -19,6 +19,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/coreos/go-semver/semver" . "github.com/pingcap/check" @@ -39,6 +40,7 @@ type configTestSuite struct{} func (s *configTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } type testItem struct { @@ -114,6 +116,7 @@ func (s *configTestSuite) TestConfig(c *C) { args2 := []string{"-u", pdAddr, "config", "set", "cluster-version", "2.1.0-rc.5"} _, _, err = pdctl.ExecuteCommandC(cmd, args2...) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) c.Assert(clusterVersion, Not(DeepEquals), svr.GetClusterVersion()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) @@ -133,6 +136,7 @@ func (s *configTestSuite) TestConfig(c *C) { args2 = []string{"-u", pdAddr, "config", "set", "label-property", "reject-leader", "zone", "cn"} _, _, err = pdctl.ExecuteCommandC(cmd, args2...) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) c.Assert(labelPropertyCfg, Not(DeepEquals), svr.GetLabelProperty()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) @@ -144,6 +148,7 @@ func (s *configTestSuite) TestConfig(c *C) { args3 := []string{"-u", pdAddr, "config", "delete", "label-property", "reject-leader", "zone", "cn"} _, _, err = pdctl.ExecuteCommandC(cmd, args3...) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) c.Assert(labelPropertyCfg, Not(DeepEquals), svr.GetLabelProperty()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index aaabdcf77949..46ab23a37894 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -41,6 +41,7 @@ type hotTestSuite struct{} func (s *hotTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } func (s *hotTestSuite) TestHot(c *C) { diff --git a/tests/pdctl/label/label_test.go b/tests/pdctl/label/label_test.go index 274eb1c4d434..ff6fa0fcbc1c 100644 --- a/tests/pdctl/label/label_test.go +++ b/tests/pdctl/label/label_test.go @@ -18,6 +18,7 @@ import ( "encoding/json" "strings" "testing" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -38,6 +39,7 @@ type labelTestSuite struct{} func (s *labelTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } func (s *labelTestSuite) TestLabel(c *C) { diff --git a/tests/pdctl/log/log_test.go b/tests/pdctl/log/log_test.go index e0406277fad9..4935d09b9404 100644 --- a/tests/pdctl/log/log_test.go +++ b/tests/pdctl/log/log_test.go @@ -35,6 +35,7 @@ type logTestSuite struct{} func (s *logTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } func (s *logTestSuite) TestLog(c *C) { @@ -88,7 +89,7 @@ func (s *logTestSuite) TestLog(c *C) { for _, testCase := range testCases { _, _, err = pdctl.ExecuteCommandC(cmd, testCase.cmd...) c.Assert(err, IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) c.Assert(svr.GetConfig().Log.Level, Equals, testCase.expect) } } diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 0f29f445b805..8d5ad6657377 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -38,6 +38,7 @@ type operatorTestSuite struct{} func (s *operatorTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } func (s *operatorTestSuite) TestOperator(c *C) { @@ -209,7 +210,7 @@ func (s *operatorTestSuite) TestOperator(c *C) { _, _, err = pdctl.ExecuteCommandC(cmd, "config", "set", "enable-placement-rules", "true") c.Assert(err, IsNil) - time.Sleep(2 * time.Second) + time.Sleep(20 * time.Millisecond) _, output, err = pdctl.ExecuteCommandC(cmd, "operator", "add", "transfer-region", "1", "2", "3") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "not supported"), IsTrue)