Skip to content

Commit ce76aef

Browse files
committed
Retry all servers on RPC call failure
rpcproxy is refactored into serverlist which prioritizes good servers over servers in a remote DC or who have had a failure.
1 parent d49dda4 commit ce76aef

9 files changed

+384
-1899
lines changed

client/client.go

+189-131
Large diffs are not rendered by default.

client/rpcproxy/rpcproxy.go

-779
This file was deleted.

client/rpcproxy/rpcproxy_test.go

-818
This file was deleted.

client/rpcproxy/server_endpoint.go

-84
This file was deleted.

client/rpcproxy/server_endpoint_test.go

-77
This file was deleted.

client/serverlist.go

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package client
2+
3+
import (
4+
"math/rand"
5+
"net"
6+
"sort"
7+
"strings"
8+
"sync"
9+
)
10+
11+
type serverlist struct {
12+
e endpoints
13+
mu sync.RWMutex
14+
}
15+
16+
func newServerList() *serverlist {
17+
return &serverlist{}
18+
}
19+
20+
// set the server list to a new list. The new list will be shuffled and sorted
21+
// by priority.
22+
func (s *serverlist) set(newe endpoints) {
23+
s.mu.Lock()
24+
s.e = newe
25+
s.mu.Unlock()
26+
}
27+
28+
// all returns a copy of the full server list, shuffled and then sorted by
29+
// priority
30+
func (s *serverlist) all() endpoints {
31+
s.mu.RLock()
32+
out := make(endpoints, len(s.e))
33+
copy(out, s.e)
34+
s.mu.RUnlock()
35+
36+
// Randomize the order
37+
for i, j := range rand.Perm(len(out)) {
38+
out[i], out[j] = out[j], out[i]
39+
}
40+
41+
// Sort by priority
42+
sort.Sort(out)
43+
return out
44+
}
45+
46+
// failed servers get deprioritized
47+
func (s *serverlist) failed(e *endpoint) {
48+
s.mu.Lock()
49+
defer s.mu.Unlock()
50+
for i := 0; i < len(s.e); i++ {
51+
if s.e[i].equal(e) {
52+
e.priority++
53+
return
54+
}
55+
}
56+
}
57+
58+
// good servers get promoted to the highest priority
59+
func (s *serverlist) good(e *endpoint) {
60+
s.mu.Lock()
61+
defer s.mu.Unlock()
62+
for i := 0; i < len(s.e); i++ {
63+
if s.e[i].equal(e) {
64+
e.priority = 0
65+
return
66+
}
67+
}
68+
}
69+
70+
func (e endpoints) Len() int {
71+
return len(e)
72+
}
73+
74+
func (e endpoints) Less(i int, j int) bool {
75+
// Sort only by priority as endpoints should be shuffled and ordered
76+
// only by priority
77+
return e[i].priority < e[j].priority
78+
}
79+
80+
func (e endpoints) Swap(i int, j int) {
81+
e[i], e[j] = e[j], e[i]
82+
}
83+
84+
type endpoints []*endpoint
85+
86+
func (e endpoints) String() string {
87+
names := make([]string, 0, len(e))
88+
for _, endpoint := range e {
89+
names = append(names, endpoint.name)
90+
}
91+
return strings.Join(names, ",")
92+
}
93+
94+
type endpoint struct {
95+
name string
96+
addr net.Addr
97+
98+
// 0 being the highest priority
99+
priority int
100+
}
101+
102+
// equal returns true if the name and addr match between two endpoints.
103+
// Priority is ignored because the same endpoint may be added by discovery and
104+
// heartbeating with different priorities.
105+
func (e *endpoint) equal(o *endpoint) bool {
106+
return e.name == o.name && e.addr == o.addr
107+
}

client/serverlist_test.go

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package client
2+
3+
import "testing"
4+
5+
func TestServerList(t *testing.T) {
6+
s := newServerList()
7+
8+
// New lists should be empty
9+
if e := s.get(); e != nil {
10+
t.Fatalf("expected empty list to return nil, but received: %v", e)
11+
}
12+
if e := s.all(); len(e) != 0 {
13+
t.Fatalf("expected empty list to return an empty list, but received: %+q", e)
14+
}
15+
16+
mklist := func() endpoints {
17+
return endpoints{
18+
&endpoint{"b", nil, 1},
19+
&endpoint{"c", nil, 1},
20+
&endpoint{"g", nil, 2},
21+
&endpoint{"d", nil, 1},
22+
&endpoint{"e", nil, 1},
23+
&endpoint{"f", nil, 1},
24+
&endpoint{"h", nil, 2},
25+
&endpoint{"a", nil, 0},
26+
}
27+
}
28+
s.set(mklist())
29+
30+
orig := mklist()
31+
all := s.all()
32+
if len(all) != len(orig) {
33+
t.Fatalf("expected %d endpoints but only have %d", len(orig), len(all))
34+
}
35+
36+
// Assert list is properly randomized+sorted
37+
for i, pri := range []int{0, 1, 1, 1, 1, 1, 2, 2} {
38+
if all[i].priority != pri {
39+
t.Errorf("expected endpoint %d (%+q) to be priority %d", i, all[i], pri)
40+
}
41+
}
42+
43+
// Subsequent sets should reshuffle (try multiple times as they may
44+
// shuffle in the same order)
45+
tries := 0
46+
max := 3
47+
for ; tries < max; tries++ {
48+
s.set(mklist())
49+
// First entry should always be the same
50+
if e := s.get(); *e != *all[0] {
51+
t.Fatalf("on try %d get returned the wrong endpoint: %+q", tries, e)
52+
}
53+
54+
all2 := s.all()
55+
if all.String() == all2.String() {
56+
// eek, matched; try again in case we just got unlucky
57+
continue
58+
}
59+
break
60+
}
61+
if tries == max {
62+
t.Fatalf("after %d attempts servers were still not random reshuffled", tries)
63+
}
64+
65+
// Mark should rotate list items in place
66+
s.mark(&endpoint{"a", nil, 0})
67+
all3 := s.all()
68+
if s.get().name == "a" || all3[len(all3)-1].name != "a" {
69+
t.Fatalf("endpoint a shold have been rotated to end")
70+
}
71+
if len(all3) != len(all) {
72+
t.Fatalf("marking should not have changed list length")
73+
}
74+
75+
// Marking a non-existant endpoint should do nothing
76+
s.mark(&endpoint{})
77+
if s.all().String() != all3.String() {
78+
t.Fatalf("marking a non-existant endpoint alterd the list")
79+
}
80+
}

command/agent/agent_endpoint.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (s *HTTPServer) listServers(resp http.ResponseWriter, req *http.Request) (i
139139
return nil, CodedError(501, ErrInvalidMethod)
140140
}
141141

142-
peers := s.agent.client.RPCProxy().ServerRPCAddrs()
142+
peers := s.agent.client.GetServers()
143143
return peers, nil
144144
}
145145

@@ -156,12 +156,11 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)
156156
}
157157

158158
// Set the servers list into the client
159-
for _, server := range servers {
160-
s.agent.logger.Printf("[TRACE] Adding server %s to the client's primary server list", server)
161-
se := client.AddPrimaryServerToRPCProxy(server)
162-
if se == nil {
163-
s.agent.logger.Printf("[ERR] Attempt to add server %q to client failed", server)
164-
}
159+
s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers)
160+
if err := client.SetServers(servers); err != nil {
161+
s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err)
162+
//TODO is this the right error to return?
163+
return nil, CodedError(400, err.Error())
165164
}
166165
return nil, nil
167166
}

nomad/pool.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
"github.com/hashicorp/consul/tlsutil"
1414
"github.com/hashicorp/net-rpc-msgpackrpc"
15-
"github.com/hashicorp/nomad/client/rpcproxy"
1615
"github.com/hashicorp/yamux"
1716
)
1817

@@ -376,9 +375,9 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,
376375

377376
// PingNomadServer sends a Status.Ping message to the specified server and
378377
// returns true if healthy, false if an error occurred
379-
func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s *rpcproxy.ServerEndpoint) (bool, error) {
378+
func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s net.Addr) (bool, error) {
380379
// Get a usable client
381-
conn, sc, err := p.getClient(region, s.Addr, apiMajorVersion)
380+
conn, sc, err := p.getClient(region, s, apiMajorVersion)
382381
if err != nil {
383382
return false, err
384383
}

0 commit comments

Comments
 (0)