Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Apr 15, 2020
1 parent b43b0f4 commit ac3450c
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 18 deletions.
55 changes: 47 additions & 8 deletions pkg/component/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewManager() *Manager {
func (c *Manager) GetComponentAddrs(component string) []string {
c.RLock()
defer c.RUnlock()
var addresses []string
addresses := []string{}
if ca, ok := c.Addresses[component]; ok {
addresses = append(addresses, ca...)
}
Expand All @@ -52,15 +52,21 @@ func (c *Manager) GetComponentAddrs(component string) []string {
func (c *Manager) GetAllComponentAddrs() map[string][]string {
c.RLock()
defer c.RUnlock()
return c.Addresses
n := make(map[string][]string)
for k, v := range c.Addresses {
b := make([]string, len(v))
copy(b, v)
n[k] = b
}
return n
}

// GetComponent returns the component from a given component ID.
func (c *Manager) GetComponent(addr string) string {
c.RLock()
defer c.RUnlock()
for component, ca := range c.Addresses {
if contains(ca, addr) {
if exist, _ := contains(ca, addr); exist {
return component
}
}
Expand All @@ -81,7 +87,7 @@ func (c *Manager) Register(component, addr string) error {
}

ca, ok := c.Addresses[component]
if ok && contains(ca, addr) {
if exist, _ := contains(ca, addr); ok && exist {
log.Info("address has already been registered", zap.String("component", component), zap.String("address", addr))
return fmt.Errorf("component %s address %s has already been registered", component, addr)
}
Expand All @@ -92,12 +98,45 @@ func (c *Manager) Register(component, addr string) error {
return nil
}

func contains(slice []string, item string) bool {
for _, s := range slice {
// UnRegister is used for unregistering a component with an address from PD.
func (c *Manager) UnRegister(component, addr string) error {
c.Lock()
defer c.Unlock()

str := strings.Split(addr, ":")
if len(str) != 0 {
ip := net.ParseIP(str[0])
if ip == nil {
return fmt.Errorf("failed to parse address %s of component %s", addr, component)
}
}

ca, ok := c.Addresses[component]
if !ok {
return fmt.Errorf("component %s not found", component)
}

if exist, idx := contains(ca, addr); exist {
ca = append(ca[:idx], ca[idx+1:]...)
log.Info("address has successfully been unregistered", zap.String("component", component), zap.String("address", addr))
if len(ca) == 0 {
delete(c.Addresses, component)
return nil
}

c.Addresses[component] = ca
return nil
}

return fmt.Errorf("address %s not found", addr)
}

func contains(slice []string, item string) (bool, int) {
for i, s := range slice {
if s == item {
return true
return true, i
}
}

return false
return false, 0
}
15 changes: 13 additions & 2 deletions pkg/component/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,28 @@ func (s *testManagerSuite) TestManager(c *C) {
// register repeatedly
c.Assert(strings.Contains(m.Register("c1", "127.0.0.1:2").Error(), "already"), IsTrue)
c.Assert(m.Register("c2", "127.0.0.1:3"), IsNil)
// register illegal address
c.Assert(m.Register("c2", "abcde"), NotNil)

// get all addresses
all := map[string][]string{
"c1": {"127.0.0.1:1", "127.0.0.1:2"},
"c2": {"127.0.0.1:3"},
}
c.Assert(m.GetAllComponentAddrs(), DeepEquals, all)

// get the specific component addresses
c.Assert(m.GetComponentAddrs("c1"), DeepEquals, all["c1"])
c.Assert(m.GetComponentAddrs("c2"), DeepEquals, all["c2"])

// get the component from the address
c.Assert(m.GetComponent("127.0.0.1:1"), Equals, "c1")
c.Assert(m.GetComponent("127.0.0.1:2"), Equals, "c1")
c.Assert(m.GetComponent("127.0.0.1:3"), Equals, "c2")

// unregister address
c.Assert(m.UnRegister("c1", "127.0.0.1:1"), IsNil)
c.Assert(m.GetComponentAddrs("c1"), DeepEquals, []string{"127.0.0.1:2"})
c.Assert(m.UnRegister("c1", "127.0.0.1:2"), IsNil)
c.Assert(m.GetComponentAddrs("c1"), DeepEquals, []string{})
all = map[string][]string{"c2": {"127.0.0.1:3"}}
c.Assert(m.GetAllComponentAddrs(), DeepEquals, all)
}
24 changes: 22 additions & 2 deletions server/api/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func newComponentHandler(svr *server.Server, rd *render.Render) *componentHandle
// @Summary Register component address.
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "PD server failed to proceed the request."
// @Router /component/register [post]
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /component [post]
func (h *componentHandler) Register(w http.ResponseWriter, r *http.Request) {
input := make(map[string]string)
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
Expand All @@ -69,6 +70,25 @@ func (h *componentHandler) Register(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

// @Tags component
// @Summary Unregister component address.
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Router /component [delete]
func (h *componentHandler) UnRegister(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
component := vars["component"]
addr := vars["addr"]
m := h.svr.GetComponentManager()
err := m.UnRegister(component, addr)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
}

// @Tags component
// @Summary List all component addresses
// @Produce json
Expand Down
38 changes: 33 additions & 5 deletions server/api/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (s *testComponentSuite) TearDownSuite(c *C) {
s.cleanup()
}

func (s *testComponentSuite) TestRegister(c *C) {
func (s *testComponentSuite) TestComponent(c *C) {
// register not happen
addr := fmt.Sprintf("%s/component", s.urlPrefix)
output := make(map[string][]string)
err := readJSON(addr, &output)
Expand All @@ -55,7 +56,7 @@ func (s *testComponentSuite) TestRegister(c *C) {
c.Assert(strings.Contains(err.Error(), "404"), IsTrue)
c.Assert(len(output1), Equals, 0)

addr2 := fmt.Sprintf("%s/component/register", s.urlPrefix)
// register 2 c1 and 1 c2
reqs := []map[string]string{
{"component": "c1", "addr": "127.0.0.1:1"},
{"component": "c1", "addr": "127.0.0.1:2"},
Expand All @@ -64,10 +65,11 @@ func (s *testComponentSuite) TestRegister(c *C) {
for _, req := range reqs {
postData, err := json.Marshal(req)
c.Assert(err, IsNil)
err = postJSON(addr2, postData)
err = postJSON(addr, postData)
c.Assert(err, IsNil)
}

// get all addresses
expected := map[string][]string{
"c1": {"127.0.0.1:1", "127.0.0.1:2"},
"c2": {"127.0.0.1:3"},
Expand All @@ -78,16 +80,42 @@ func (s *testComponentSuite) TestRegister(c *C) {
c.Assert(err, IsNil)
c.Assert(output, DeepEquals, expected)

// get the specific component addresses
expected1 := []string{"127.0.0.1:1", "127.0.0.1:2"}
var output2 []string
err = readJSON(addr1, &output2)
c.Assert(err, IsNil)
c.Assert(output2, DeepEquals, expected1)

addr3 := fmt.Sprintf("%s/component/c2", s.urlPrefix)
addr2 := fmt.Sprintf("%s/component/c2", s.urlPrefix)
expected2 := []string{"127.0.0.1:3"}
var output3 []string
err = readJSON(addr3, &output3)
err = readJSON(addr2, &output3)
c.Assert(err, IsNil)
c.Assert(output3, DeepEquals, expected2)

// unregister address
addr3 := fmt.Sprintf("%s/component/c1/127.0.0.1:1", s.urlPrefix)
res, err := doDelete(addr3)
c.Assert(err, IsNil)
c.Assert(res.StatusCode, Equals, 200)

expected3 := map[string][]string{
"c1": {"127.0.0.1:2"},
"c2": {"127.0.0.1:3"},
}
output = make(map[string][]string)
err = readJSON(addr, &output)
c.Assert(err, IsNil)
c.Assert(output, DeepEquals, expected3)

addr4 := fmt.Sprintf("%s/component/c1/127.0.0.1:2", s.urlPrefix)
res, err = doDelete(addr4)
c.Assert(err, IsNil)
c.Assert(res.StatusCode, Equals, 200)
expected4 := map[string][]string{"c2": {"127.0.0.1:3"}}
output = make(map[string][]string)
err = readJSON(addr, &output)
c.Assert(err, IsNil)
c.Assert(output, DeepEquals, expected4)
}
3 changes: 2 additions & 1 deletion server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.R
clusterRouter.HandleFunc("/replication_mode/status", replicationModeHandler.GetStatus)

componentHandler := newComponentHandler(svr, rd)
apiRouter.HandleFunc("/component/register", componentHandler.Register).Methods("POST")
apiRouter.HandleFunc("/component", componentHandler.Register).Methods("POST")
apiRouter.HandleFunc("/component/{component}/{addr}", componentHandler.UnRegister).Methods("DELETE")
apiRouter.HandleFunc("/component", componentHandler.GetAllAddress).Methods("GET")
apiRouter.HandleFunc("/component/{type}", componentHandler.GetAddress).Methods("GET")

Expand Down

0 comments on commit ac3450c

Please sign in to comment.