Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Cache old partition states incase of ZooKeeper failure. #65

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func (vs *version) serveError(w http.ResponseWriter, key string, err error) {
w.WriteHeader(http.StatusInternalServerError)
}

func shuffle(vs []string) []string {
func shuffle(vs []string, disappeared []string) []string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will return an arbitrary number of deleted nodes in addition to the live ones, which will change the behavior of proxy subtly. I think we want to try just peers, where dead peers are only considered if there aren't enough live peers to fulfill the replication factor.

shuffled := make([]string, len(vs))
perm := rand.Perm(len(vs))
for i, v := range perm {
shuffled[v] = vs[i]
}

return shuffled
return append(shuffled, disappeared...)
}
49 changes: 43 additions & 6 deletions sharding/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Partitions struct {
selected map[int]bool
local map[int]bool
remote map[int][]string
disappeared map[int][]string
numMissing int
readyClosed bool
shouldAdvertise bool
Expand All @@ -55,6 +56,7 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu
replication: replication,
local: make(map[int]bool),
remote: make(map[int][]string),
disappeared: make(map[int][]string, 1024),
}

p.pickLocal()
Expand All @@ -69,6 +71,19 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu
return p
}

// Dedupelicates elements in a slice of strings.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be of the form dedupe deduplicates...

func dedupe(nodes []string) []string {
found := map[string]bool{}
dedupedNodes := make([]string, 0, len(nodes))
for _, node := range nodes {
if !found[node] {
found[node] = true
dedupedNodes = append(dedupedNodes, node)
}
}
return dedupedNodes
}

// pickLocal selects which partitions are local by iterating through
// them all, and checking the hashring to see if this peer is one of the
// replicas.
Expand Down Expand Up @@ -107,17 +122,20 @@ func (p *Partitions) sync(updates chan []string) {
}

// FindPeers returns the list of peers who have the given partition available.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring is still wrong, and I still think the signature would be better if it just returned a list of peers vaguely prioritized. That way we could trivially add some sort of greylisting for peers that exhibit errors a lot.

func (p *Partitions) FindPeers(partition int) []string {
if p.peers == nil {
return nil
}

func (p *Partitions) FindPeers(partition int) ([]string, []string) {
p.lock.RLock()
defer p.lock.RUnlock()

disappearedPeers := make([]string, 1024)
copy(disappearedPeers, p.disappeared[partition])

if p.peers == nil {
return nil, disappearedPeers
}

peers := make([]string, len(p.remote[partition]))
copy(peers, p.remote[partition])
return peers
return peers, disappearedPeers
}

// Update updates the list of local partitions to the given list.
Expand Down Expand Up @@ -228,6 +246,25 @@ func (p *Partitions) updateRemote(nodes []string) {
}
}

for partitionId, partition := range p.remote {
disappearedPeers := make([]string, len(partition))
for _, oldPeer := range partition {
found := false
for _, newPeer := range remote[partitionId] {
if newPeer == oldPeer {
found = true
}
}
if !found {
disappearedPeers = append(disappearedPeers, oldPeer)
}
}
p.disappeared[partitionId] = dedupe(append(disappearedPeers, p.disappeared[partitionId]...))
if len(p.disappeared[partitionId]) >= 1024 {
p.disappeared[partitionId] = p.disappeared[partitionId][:1024]
}
}

p.remote = remote
p.updateMissing()
}
Expand Down