Skip to content

Commit

Permalink
[FIXED] When scaling down a stream make sure consumers are adjusted p…
Browse files Browse the repository at this point in the history
…roperly. (#5927)

When scaling down a stream make sure replica count is correct if
adjusted and also make sure we do not have orphan consumers.

When we scale down a replicated stream, say R5, if it has consumers that
are a lower replica count, say R1, they could be placed on the peers
that may go away. We need to make sure we properly assign peers and
transfer state as needed.

Note the consumer state transfer expects the state to be stable, so
should be paused.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Sep 25, 2024
2 parents 678eed7 + 596b71c commit 4c21aa3
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 4 deletions.
69 changes: 65 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6357,8 +6357,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
}

if isReplicaChange {
isScaleUp := newCfg.Replicas > len(rg.Peers)
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
if isScaleUp {
// Check that we have the allocation available.
if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil {
resp.Error = err
Expand Down Expand Up @@ -6434,22 +6435,82 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su

// Need to remap any consumers.
for _, ca := range osa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy.
// Legacy ephemerals are R=1 but present as R=0, so only auto-remap named consumers, or if we are downsizing the consumer peers.
// If stream is interest or workqueue policy always remaps since they require peer parity with stream.
numPeers := len(ca.Group.Peers)
if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy {
isAutoScale := ca.Config.Replicas == 0 && (ca.Config.Durable != _EMPTY_ || ca.Config.Name != _EMPTY_)
if isAutoScale || numPeers > len(rg.Peers) || cfg.Retention != LimitsPolicy {
cca := ca.copyGroup()
// Adjust preferred as needed.
if numPeers == 1 && len(rg.Peers) > 1 {
if numPeers == 1 && isScaleUp {
cca.Group.Preferred = ca.Group.Peers[0]
} else {
cca.Group.Preferred = _EMPTY_
}
// Assign new peers.
cca.Group.Peers = rg.Peers
// If the replicas was not 0 make sure it matches here.
if cca.Config.Replicas != 0 {
cca.Config.Replicas = len(rg.Peers)
}
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)

} else if !isScaleUp {
// We decided to leave this consumer's peer group alone but we are also scaling down.
// We need to make sure we do not have any peers that are no longer part of the stream.
// Note we handle down scaling of a consumer above if its number of peers were > new stream peers.
var needReplace []string
for _, rp := range ca.Group.Peers {
// Check if we have an orphaned peer now for this consumer.
if !rg.isMember(rp) {
needReplace = append(needReplace, rp)
}
}
if len(needReplace) > 0 {
newPeers := copyStrings(rg.Peers)
rand.Shuffle(len(newPeers), func(i, j int) { newPeers[i], newPeers[j] = newPeers[j], newPeers[i] })
// If we had a small size then the peer set, restrict to the same number.
if lp := len(ca.Group.Peers); lp < len(newPeers) {
newPeers = newPeers[:lp]
}
cca := ca.copyGroup()
// Assign new peers.
cca.Group.Peers = newPeers
// If the replicas was not 0 make sure it matches here.
if cca.Config.Replicas != 0 {
cca.Config.Replicas = len(newPeers)
}
// Check if all peers are invalid. This can happen with R1 under replicated streams that are being scaled down.
if len(needReplace) == len(ca.Group.Peers) {
// We have to transfer state to new peers.
// we will grab our state and attach to the new assignment.
// TODO(dlc) - In practice we would want to make sure the consumer is paused.
// Need to release js lock.
js.mu.Unlock()
if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, acc, osa.Config.Name, ca.Name); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, osa.Config.Name, ca.Name, err)
} else if ci != nil {
cca.State = &ConsumerState{
Delivered: SequencePair{
Consumer: ci.Delivered.Consumer,
Stream: ci.Delivered.Stream,
},
AckFloor: SequencePair{
Consumer: ci.AckFloor.Consumer,
Stream: ci.AckFloor.Stream,
},
}
}
// Re-acquire here.
js.mu.Lock()
}
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)
}
}
}

} else if isMoveRequest {
if len(peerSet) == 0 {
nrg, err := js.createGroupForStream(ci, newCfg)
Expand Down
125 changes: 125 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3961,3 +3961,128 @@ func TestJetStreamPendingRequestsInJsz(t *testing.T) {
require_True(t, m.Stats.JetStream != nil)
require_NotEqual(t, m.Stats.JetStream.Meta.Pending, 0)
}

func TestJetStreamConsumerReplicasAfterScale(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 5,
})
require_NoError(t, err)

// Put some messages in to test consumer state transfer.
for i := 0; i < 100; i++ {
js.PublishAsync("foo", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Create four different consumers.
// Normal where we inherit replicas from parent.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 4)

// Ephemeral
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0) // Legacy ephemeral is 0 here too.
require_Equal(t, len(ci.Cluster.Replicas), 0)
eName := ci.Name

// R1
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "r1",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 1)
require_Equal(t, len(ci.Cluster.Replicas), 0)

// R3
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "r3",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 3)
require_Equal(t, len(ci.Cluster.Replicas), 2)

// Now create some state on r1 consumer.
sub, err := js.PullSubscribe("foo", "r1")
require_NoError(t, err)

fetch := rand.Intn(99) + 1 // Needs to be at least 1.
msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), fetch)
ack := rand.Intn(fetch)
for i := 0; i <= ack; i++ {
msgs[i].AckSync()
}
r1ci, err := js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)
r1ci.Delivered.Last, r1ci.AckFloor.Last = nil, nil

// Now scale stream to R3.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

c.waitOnStreamLeader(globalAccountName, "TEST")

// Now check each.
c.waitOnConsumerLeader(globalAccountName, "TEST", "dur")
ci, err = js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 2)

c.waitOnConsumerLeader(globalAccountName, "TEST", eName)
ci, err = js.ConsumerInfo("TEST", eName)
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 0)

c.waitOnConsumerLeader(globalAccountName, "TEST", "r1")
ci, err = js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 1)
require_Equal(t, len(ci.Cluster.Replicas), 0)
// Now check that state transferred correctly.
ci.Delivered.Last, ci.AckFloor.Last = nil, nil
if ci.Delivered != r1ci.Delivered {
t.Fatalf("Delivered state for R1 incorrect, wanted %+v got %+v",
r1ci.Delivered, ci.Delivered)
}
if ci.AckFloor != r1ci.AckFloor {
t.Fatalf("AckFloor state for R1 incorrect, wanted %+v got %+v",
r1ci.AckFloor, ci.AckFloor)
}

c.waitOnConsumerLeader(globalAccountName, "TEST", "r3")
ci, err = js.ConsumerInfo("TEST", "r3")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 3)
require_Equal(t, len(ci.Cluster.Replicas), 2)
}

0 comments on commit 4c21aa3

Please sign in to comment.