Skip to content

Commit 64ac9b9

Browse files
committed
Retry all servers on RPC call failure
rpcproxy is refactored into serverlist which prioritizes good servers over servers in a remote DC or who have had a failure. Registration, heartbeating, and alloc status updating will retry faster when new servers are discovered. Consul discovery will be retried more quickly when no servers are available (eg on startup or an outage).
1 parent d49dda4 commit 64ac9b9

9 files changed

+428
-1900
lines changed

client/client.go

+192-132
Large diffs are not rendered by default.

client/rpcproxy/rpcproxy.go

-779
This file was deleted.

client/rpcproxy/rpcproxy_test.go

-818
This file was deleted.

client/rpcproxy/server_endpoint.go

-84
This file was deleted.

client/rpcproxy/server_endpoint_test.go

-77
This file was deleted.

client/serverlist.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package client
2+
3+
import (
4+
"math/rand"
5+
"net"
6+
"sort"
7+
"strings"
8+
"sync"
9+
)
10+
11+
// serverlist is a prioritized randomized list of nomad servers. Users should
12+
// call all() to retrieve the full list, followed by failed(e) on each endpoint
13+
// that's failed and good(e) when a valid endpoint is found.
14+
type serverlist struct {
15+
e endpoints
16+
mu sync.RWMutex
17+
}
18+
19+
func newServerList() *serverlist {
20+
return &serverlist{}
21+
}
22+
23+
// set the server list to a new list. The new list will be shuffled and sorted
24+
// by priority.
25+
func (s *serverlist) set(in endpoints) {
26+
s.mu.Lock()
27+
s.e = in
28+
s.mu.Unlock()
29+
}
30+
31+
// all returns a copy of the full server list, shuffled and then sorted by
32+
// priority
33+
func (s *serverlist) all() endpoints {
34+
s.mu.RLock()
35+
out := make(endpoints, len(s.e))
36+
copy(out, s.e)
37+
s.mu.RUnlock()
38+
39+
// Randomize the order
40+
for i, j := range rand.Perm(len(out)) {
41+
out[i], out[j] = out[j], out[i]
42+
}
43+
44+
// Sort by priority
45+
sort.Sort(out)
46+
return out
47+
}
48+
49+
// failed endpoint will be deprioritized if its still in the list.
50+
func (s *serverlist) failed(e *endpoint) {
51+
s.mu.Lock()
52+
defer s.mu.Unlock()
53+
for _, cur := range s.e {
54+
if cur.equal(e) {
55+
cur.priority++
56+
return
57+
}
58+
}
59+
}
60+
61+
// good endpoint will get promoted to the highest priority if it's still in the
62+
// list.
63+
func (s *serverlist) good(e *endpoint) {
64+
s.mu.Lock()
65+
defer s.mu.Unlock()
66+
for _, cur := range s.e {
67+
if cur.equal(e) {
68+
cur.priority = 0
69+
return
70+
}
71+
}
72+
}
73+
74+
func (e endpoints) Len() int {
75+
return len(e)
76+
}
77+
78+
func (e endpoints) Less(i int, j int) bool {
79+
// Sort only by priority as endpoints should be shuffled and ordered
80+
// only by priority
81+
return e[i].priority < e[j].priority
82+
}
83+
84+
func (e endpoints) Swap(i int, j int) {
85+
e[i], e[j] = e[j], e[i]
86+
}
87+
88+
type endpoints []*endpoint
89+
90+
func (e endpoints) String() string {
91+
names := make([]string, 0, len(e))
92+
for _, endpoint := range e {
93+
names = append(names, endpoint.name)
94+
}
95+
return strings.Join(names, ",")
96+
}
97+
98+
type endpoint struct {
99+
name string
100+
addr net.Addr
101+
102+
// 0 being the highest priority
103+
priority int
104+
}
105+
106+
// equal returns true if the name and addr match between two endpoints.
107+
// Priority is ignored because the same endpoint may be added by discovery and
108+
// heartbeating with different priorities.
109+
func (e *endpoint) equal(o *endpoint) bool {
110+
return e.name == o.name && e.addr == o.addr
111+
}

client/serverlist_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package client
2+
3+
import (
4+
"log"
5+
"os"
6+
"strings"
7+
"testing"
8+
)
9+
10+
func TestServerList(t *testing.T) {
11+
s := newServerList()
12+
13+
// New lists should be empty
14+
if e := s.all(); len(e) != 0 {
15+
t.Fatalf("expected empty list to return an empty list, but received: %+q", e)
16+
}
17+
18+
mklist := func() endpoints {
19+
return endpoints{
20+
&endpoint{"b", nil, 1},
21+
&endpoint{"c", nil, 1},
22+
&endpoint{"g", nil, 2},
23+
&endpoint{"d", nil, 1},
24+
&endpoint{"e", nil, 1},
25+
&endpoint{"f", nil, 1},
26+
&endpoint{"h", nil, 2},
27+
&endpoint{"a", nil, 0},
28+
}
29+
}
30+
s.set(mklist())
31+
32+
orig := mklist()
33+
all := s.all()
34+
if len(all) != len(orig) {
35+
t.Fatalf("expected %d endpoints but only have %d", len(orig), len(all))
36+
}
37+
38+
// Assert list is properly randomized+sorted
39+
for i, pri := range []int{0, 1, 1, 1, 1, 1, 2, 2} {
40+
if all[i].priority != pri {
41+
t.Errorf("expected endpoint %d (%+q) to be priority %d", i, all[i], pri)
42+
}
43+
}
44+
45+
// Subsequent sets should reshuffle (try multiple times as they may
46+
// shuffle in the same order)
47+
tries := 0
48+
max := 3
49+
for ; tries < max; tries++ {
50+
if s.all().String() == s.all().String() {
51+
// eek, matched; try again in case we just got unlucky
52+
continue
53+
}
54+
break
55+
}
56+
if tries == max {
57+
t.Fatalf("after %d attempts servers were still not random reshuffled", tries)
58+
}
59+
60+
// Mark an endpoint as failed enough that it should be at the end of the list
61+
sa := &endpoint{"a", nil, 0}
62+
s.failed(sa)
63+
s.failed(sa)
64+
s.failed(sa)
65+
all2 := s.all()
66+
if len(all2) != len(orig) {
67+
t.Fatalf("marking should not have changed list length")
68+
}
69+
if all2[len(all)-1].name != sa.name {
70+
t.Fatalf("failed endpoint should be at end of list: %+q", all2)
71+
}
72+
73+
// But if the bad endpoint succeeds even once it should be bumped to the top group
74+
s.good(sa)
75+
found := false
76+
for _, e := range s.all() {
77+
if e.name == sa.name {
78+
if e.priority != 0 {
79+
t.Fatalf("server newly marked good should have highest priority")
80+
}
81+
found = true
82+
}
83+
}
84+
if !found {
85+
t.Fatalf("what happened to endpoint A?!")
86+
}
87+
}
88+
89+
// TestClient_ServerList tests client methods that interact with the internal
90+
// nomad server list.
91+
func TestClient_ServerList(t *testing.T) {
92+
// manually create a mostly empty client to avoid spinning up a ton of
93+
// goroutines that complicate testing
94+
client := Client{servers: newServerList(), logger: log.New(os.Stderr, "", log.Ltime|log.Lshortfile)}
95+
96+
if s := client.GetServers(); len(s) != 0 {
97+
t.Fatalf("expected server lit to be empty but found: %+q", s)
98+
}
99+
if err := client.SetServers(nil); err != noServers {
100+
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
101+
}
102+
if err := client.SetServers([]string{"not-a-real-domain.fake"}); err == nil {
103+
t.Fatalf("expected setting a bad server to return an error")
104+
}
105+
if err := client.SetServers([]string{"bad.fake", "127.0.0.1:1234", "127.0.0.1"}); err != nil {
106+
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
107+
}
108+
s := client.GetServers()
109+
if len(s) != 2 {
110+
t.Fatalf("expected 2 servers but received: %+q", s)
111+
}
112+
for _, host := range s {
113+
if !strings.HasPrefix(host, "127.0.0.1:") {
114+
t.Errorf("expected both servers to be localhost and include port but found: %s", host)
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)