From e18c85e6ebd165d2ee842cd462ac506423df0b67 Mon Sep 17 00:00:00 2001 From: Jim Scott Date: Thu, 1 Dec 2016 13:39:35 -0800 Subject: [PATCH 1/4] Rebase master onto this branch --- serve.go | 4 ++-- sharding/partitions.go | 49 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/serve.go b/serve.go index de09dd6d..d19af787 100644 --- a/serve.go +++ b/serve.go @@ -120,12 +120,12 @@ func (vs *version) serveError(w http.ResponseWriter, key string, err error) { w.WriteHeader(http.StatusInternalServerError) } -func shuffle(vs []string) []string { +func shuffle(vs []string, disappeared []string) []string { shuffled := make([]string, len(vs)) perm := rand.Perm(len(vs)) for i, v := range perm { shuffled[v] = vs[i] } - return shuffled + return append(shuffled, disappeared...) } diff --git a/sharding/partitions.go b/sharding/partitions.go index cc9e3d6c..28ba9851 100644 --- a/sharding/partitions.go +++ b/sharding/partitions.go @@ -33,6 +33,7 @@ type Partitions struct { selected map[int]bool local map[int]bool remote map[int][]string + disappeared map[int][]string numMissing int readyClosed bool shouldAdvertise bool @@ -55,6 +56,7 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu replication: replication, local: make(map[int]bool), remote: make(map[int][]string), + disappeared: make(map[int][]string, 1024), } p.pickLocal() @@ -69,6 +71,19 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu return p } +// Dedupelicates elements in a slice of strings. +func dedupe(nodes []string) []string { + found := map[string]bool{} + dedupedNodes := make([]string, 0, len(nodes)) + for _, node := range nodes { + if !found[node] { + found[node] = true + dedupedNodes = append(dedupedNodes, node) + } + } + return dedupedNodes +} + // pickLocal selects which partitions are local by iterating through // them all, and checking the hashring to see if this peer is one of the // replicas. @@ -107,17 +122,20 @@ func (p *Partitions) sync(updates chan []string) { } // FindPeers returns the list of peers who have the given partition available. -func (p *Partitions) FindPeers(partition int) []string { - if p.peers == nil { - return nil - } - +func (p *Partitions) FindPeers(partition int) ([]string, []string) { p.lock.RLock() defer p.lock.RUnlock() + disappearedPeers := make([]string, 1024) + copy(disappearedPeers, p.disappeared[partition]) + + if p.peers == nil { + return nil, disappearedPeers + } + peers := make([]string, len(p.remote[partition])) copy(peers, p.remote[partition]) - return peers + return peers, disappearedPeers } // Update updates the list of local partitions to the given list. @@ -228,6 +246,25 @@ func (p *Partitions) updateRemote(nodes []string) { } } + for partitionId, partition := range p.remote { + disappearedPeers := make([]string, len(partition)) + for _, oldPeer := range partition { + found := false + for _, newPeer := range remote[partitionId] { + if newPeer == oldPeer { + found = true + } + } + if !found { + disappearedPeers = append(disappearedPeers, oldPeer) + } + } + p.disappeared[partitionId] = dedupe(append(disappearedPeers, p.disappeared[partitionId]...)) + if len(p.disappeared[partitionId]) >= 1024 { + p.disappeared[partitionId] = p.disappeared[partitionId][:1024] + } + } + p.remote = remote p.updateMissing() } From 55693a3711c1e4725958fe201a216620d77e0921 Mon Sep 17 00:00:00 2001 From: Jim Scott Date: Thu, 1 Dec 2016 16:58:45 -0800 Subject: [PATCH 2/4] Fixed doc strings --- sharding/partitions.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sharding/partitions.go b/sharding/partitions.go index 28ba9851..526592c9 100644 --- a/sharding/partitions.go +++ b/sharding/partitions.go @@ -71,7 +71,7 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu return p } -// Dedupelicates elements in a slice of strings. +// dedupe dedupelicates elements in a slice of strings. func dedupe(nodes []string) []string { found := map[string]bool{} dedupedNodes := make([]string, 0, len(nodes)) @@ -121,7 +121,9 @@ func (p *Partitions) sync(updates chan []string) { } } -// FindPeers returns the list of peers who have the given partition available. +// FindPeers returns the list of peers who have the given partition available, +// It also returns a list of disappeared peers, +// that are no longer in Zookeeper func (p *Partitions) FindPeers(partition int) ([]string, []string) { p.lock.RLock() defer p.lock.RUnlock() From c1f627d611404353fbacec97909a7c3c0f3f4a2e Mon Sep 17 00:00:00 2001 From: Jim Scott Date: Fri, 2 Dec 2016 10:02:45 -0800 Subject: [PATCH 3/4] Only try dead nodes if peers are less than replication factor. --- serve.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/serve.go b/serve.go index d19af787..d3abb42b 100644 --- a/serve.go +++ b/serve.go @@ -58,7 +58,11 @@ func (vs *version) serveProxied(w http.ResponseWriter, r *http.Request, // Shuffle the peers, so we try them in a random order. // TODO: We don't want to blacklist nodes, but we can weight them lower - peers := shuffle(vs.partitions.FindPeers(partition)) + rawPeers, disapperedNodes := vs.partitions.FindPeers(partition) + if len(rawPeers) < vs.sequins.config.Sharding.Replication { + rawPeers = append(rawPeers, disapperedNodes...) + } + peers := shuffle(rawPeers) if len(peers) == 0 { log.Printf("No peers available for /%s/%s (version %s)", vs.db.name, key, vs.name) w.WriteHeader(http.StatusBadGateway) @@ -70,7 +74,12 @@ func (vs *version) serveProxied(w http.ResponseWriter, r *http.Request, log.Println("Trying alternate partition for pathological key", key) resp.Body.Close() - alternatePeers := shuffle(vs.partitions.FindPeers(alternatePartition)) + rawPeers, disapperedNodes = vs.partitions.FindPeers(alternatePartition) + if len(rawPeers) < vs.sequins.config.Sharding.Replication { + rawPeers = append(rawPeers, disapperedNodes...) + } + + alternatePeers := shuffle(rawPeers) resp, peer, err = vs.proxy(r, alternatePeers) } @@ -120,12 +129,12 @@ func (vs *version) serveError(w http.ResponseWriter, key string, err error) { w.WriteHeader(http.StatusInternalServerError) } -func shuffle(vs []string, disappeared []string) []string { +func shuffle(vs []string) []string { shuffled := make([]string, len(vs)) perm := rand.Perm(len(vs)) for i, v := range perm { shuffled[v] = vs[i] } - return append(shuffled, disappeared...) + return shuffled } From 655a85ef6451e821b31462e090a9fb56fa6d80dd Mon Sep 17 00:00:00 2001 From: Jim Scott Date: Mon, 5 Dec 2016 10:47:10 -0800 Subject: [PATCH 4/4] Add simple tests for dedupe. --- sharding/partitions_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 sharding/partitions_test.go diff --git a/sharding/partitions_test.go b/sharding/partitions_test.go new file mode 100644 index 00000000..c1225164 --- /dev/null +++ b/sharding/partitions_test.go @@ -0,0 +1,23 @@ +package sharding + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDedupeRandom(t *testing.T) { + dupe := []string{"4", "1", "2", "1", "3", "2"} + expected := []string{"4", "1", "2", "3"} + + deduped := dedupe(dupe) + assert.Equal(t, expected, deduped) +} + +func TestDedupe(t *testing.T) { + dupe := []string{"1", "1", "1", "2", "2", "3", "3", "3", "4", "5", "5"} + expected := []string{"1", "2", "3", "4", "5"} + + deduped := dedupe(dupe) + assert.Equal(t, expected, deduped) +}