Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] JetStream: data race between consumer and stream when updated #5820

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

mset.mu.RLock()
s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc
retention := cfg.Retention
s, jsa, cfg, acc := mset.srv, mset.jsa, mset.getCfg(), mset.acc
mset.mu.RUnlock()

// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
Expand All @@ -741,7 +740,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// Make sure we have sane defaults. Do so with the JS lock, otherwise a
// badly timed meta snapshot can result in a race condition.
mset.js.mu.Lock()
setConsumerConfigDefaults(config, &mset.cfg, srvLim, selectedLimits)
setConsumerConfigDefaults(config, &cfg, srvLim, selectedLimits)
mset.js.mu.Unlock()

if err := checkConsumerCfg(config, srvLim, &cfg, acc, selectedLimits, isRecovering); err != nil {
Expand Down Expand Up @@ -787,7 +786,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerAlreadyExistsError()
}
// Check for overlapping subjects if we are a workqueue
if mset.cfg.Retention == WorkQueuePolicy {
if cfg.Retention == WorkQueuePolicy {
subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects)
if !mset.partitionUnique(cName, subjects) {
return nil, NewJSConsumerWQConsumerNotUniqueError()
Expand All @@ -809,7 +808,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// but if not we use the value from account limits, if account limits is more restrictive
// than stream config we prefer the account limits to handle cases where account limits are
// updated during the lifecycle of the stream
maxc := mset.cfg.MaxConsumers
maxc := cfg.MaxConsumers
if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) {
maxc = selectedLimits.MaxConsumers
}
Expand All @@ -819,7 +818,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}

// Check on stream type conflicts with WorkQueues.
if mset.cfg.Retention == WorkQueuePolicy && !config.Direct {
if cfg.Retention == WorkQueuePolicy && !config.Direct {
// Force explicit acks here.
if config.AckPolicy != AckExplicit {
mset.mu.Unlock()
Expand Down Expand Up @@ -877,7 +876,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
sfreq: int32(sampleFreq),
maxdc: uint64(config.MaxDeliver),
maxp: config.MaxAckPending,
retention: retention,
retention: cfg.Retention,
created: time.Now().UTC(),
}

Expand Down Expand Up @@ -910,17 +909,17 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}
// Create ackMsgs queue now that we have a consumer name
o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name))
o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, cfg.Name))

// Create our request waiting queue.
if o.isPullMode() {
o.waiting = newWaitQueue(config.MaxWaiting)
// Create our internal queue for next msg requests.
o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name))
o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, cfg.Name))
}

// already under lock, mset.Name() would deadlock
o.stream = mset.cfg.Name
o.stream = cfg.Name
o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name
o.nakEventT = JSAdvisoryConsumerMsgNakPre + "." + o.stream + "." + o.name
o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name
Expand Down Expand Up @@ -1005,7 +1004,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
// that to scanf them back in.
// Escape '%' in consumer and stream names, as `pre` is used as a template later
// in consumer.ackReply(), resulting in erroneous formatting of the ack subject.
mn := strings.ReplaceAll(mset.cfg.Name, "%", "%%")
mn := strings.ReplaceAll(cfg.Name, "%", "%%")
pre := fmt.Sprintf(jsAckT, mn, strings.ReplaceAll(o.name, "%", "%%"))
o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre)
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
Expand All @@ -1017,7 +1016,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if o.isPushMode() {
// Check if we are running only 1 replica and that the delivery subject has interest.
// Check in place here for interest. Will setup properly in setLeader.
if config.replicas(&mset.cfg) == 1 {
if config.replicas(&cfg) == 1 {
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
if !o.hasDeliveryInterest(interest) {
// Let the interest come to us eventually, but setup delete timer.
Expand Down Expand Up @@ -1227,7 +1226,7 @@ func (o *consumer) setLeader(isLeader bool) {
}

mset.mu.RLock()
s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq
s, jsa, stream, lseq := mset.srv, mset.jsa, mset.getCfgName(), mset.lseq
mset.mu.RUnlock()

o.mu.Lock()
Expand Down Expand Up @@ -1809,6 +1808,8 @@ func (o *consumer) setRateLimit(bps uint64) {

// Burst should be set to maximum msg size for this account, etc.
var burst int
// We don't need to get cfgMu's rlock here since this function
// is already invoked under mset.mu.RLock(), which superseeds cfgMu.
if mset.cfg.MaxMsgSize > 0 {
burst = int(mset.cfg.MaxMsgSize)
} else if mset.jsa.account.limits.mpay > 0 {
Expand Down Expand Up @@ -2551,7 +2552,7 @@ func (o *consumer) checkRedelivered(slseq uint64) {
if shouldUpdateState {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err)
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.getCfgName(), name, err)
}
}
}
Expand Down Expand Up @@ -2975,16 +2976,21 @@ func (o *consumer) isFiltered() bool {
return true
}

// Protect access to mset.cfg with the cfgMu mutex.
mset.cfgMu.RLock()
msetSubjects := mset.cfg.Subjects
mset.cfgMu.RUnlock()

// `isFiltered` need to be performant, so we do
// as any checks as possible to avoid unnecessary work.
// Here we avoid iteration over slices if there is only one subject in stream
// and one filter for the consumer.
if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 {
return mset.cfg.Subjects[0] != o.subjf[0].subject
if len(msetSubjects) == 1 && len(o.subjf) == 1 {
return msetSubjects[0] != o.subjf[0].subject
}

// if the list is not equal length, we can return early, as this is filtered.
if len(mset.cfg.Subjects) != len(o.subjf) {
if len(msetSubjects) != len(o.subjf) {
return true
}

Expand All @@ -2996,7 +3002,7 @@ func (o *consumer) isFiltered() bool {
for _, val := range o.subjf {
cfilters[val.subject] = struct{}{}
}
for _, val := range mset.cfg.Subjects {
for _, val := range msetSubjects {
if _, ok := cfilters[val]; !ok {
return true
}
Expand Down Expand Up @@ -4035,10 +4041,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
inch := o.inch
o.mu.Unlock()

// Grab the stream's retention policy
mset.mu.RLock()
rp := mset.cfg.Retention
mset.mu.RUnlock()
// Grab the stream's retention policy and name
mset.cfgMu.RLock()
stream, rp := mset.cfg.Name, mset.cfg.Retention
mset.cfgMu.RUnlock()

var err error

Expand Down Expand Up @@ -4101,12 +4107,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
goto waitForMsgs
} else if err == errPartialCache {
s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name)
o.mset.acc, stream, o.cfg.Name)
goto waitForMsgs

} else {
s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err)
o.mset.acc, stream, o.cfg.Name, err)
goto waitForMsgs
}
}
Expand Down Expand Up @@ -4791,7 +4797,7 @@ func (o *consumer) checkPending() {
if shouldUpdateState {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err)
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.getCfgName(), name, err)
}
}
}
Expand Down Expand Up @@ -4912,7 +4918,10 @@ func (o *consumer) selectStartingSeqNo() {
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
// If our parent stream is set to max msgs per subject of 1 this is just
// a normal consumer at this point. We can avoid any heavy lifting.
if o.mset.cfg.MaxMsgsPer == 1 {
o.mset.cfgMu.RLock()
mmp := o.mset.cfg.MaxMsgsPer
o.mset.cfgMu.RUnlock()
if mmp == 1 {
o.sseq = state.FirstSeq
} else {
// A threshold for when we switch from get last msg to subjects state.
Expand Down Expand Up @@ -5305,6 +5314,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
if mset != nil {
mset.mu.Lock()
mset.removeConsumer(o)
// No need for cfgMu's lock since mset.mu.Lock superseeds it.
rp = mset.cfg.Retention
mset.mu.Unlock()
}
Expand Down
36 changes: 35 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ type stream struct {
consumers map[string]*consumer // The consumers for this stream.
numFilter int // The number of filtered consumers.
cfg StreamConfig // The stream's config.
cfgMu sync.RWMutex // Config mutex used to solve some races with consumer code
created time.Time // Time the stream was created.
stype StorageType // The storage type.
tier string // The tier is the number of replicas for the stream (e.g. "R1" or "R3").
Expand Down Expand Up @@ -1978,7 +1979,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
}

// Now update config and store's version of our config.
mset.cfg = *cfg
mset.setCfg(cfg)

// If we're changing retention and haven't errored because of consumer
// replicas by now, whip through and update the consumer retention.
Expand Down Expand Up @@ -2029,6 +2030,39 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
return nil
}

// Sets the configuration for this stream.
// This is called under mset.mu.Lock(), but this function will also acquire
// mset.cfgMu lock. This is so that in places where mset.mu cannot be acquired
// (like many cases in consumer.go where code is under the consumer's lock),
// but the stream's configuration needs to be inspected, the caller can call
// mset.getCfg() to get (safely) a copy of the stream's configuration.
func (mset *stream) setCfg(cfg *StreamConfig) {
mset.cfgMu.Lock()
mset.cfg = *cfg
mset.cfgMu.Unlock()
}

// The stream configuration is a structure. Normally, code that would do
// something like cfg := mset.cfg would get a copy of that structure. However,
// this would race with code that assigns a new congiguration to the stream
// (think stream update). The update will be done using setCfg(), and therefore,
// to avoid races, callers that need the stream's congiguration should call
// this function, or protect using mset.cfgMu read lock.
func (mset *stream) getCfg() StreamConfig {
mset.cfgMu.RLock()
defer mset.cfgMu.RUnlock()
return mset.cfg
}

// Small helper to return the Name field from mset.cfg, protected by
// the mset.cfgMu mutex. This is simply because we have several places
// in consumer.go where we need it.
func (mset *stream) getCfgName() string {
mset.cfgMu.RLock()
defer mset.cfgMu.RUnlock()
return mset.cfg.Name
}

// Purge will remove all messages from the stream and underlying store based on the request.
func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) {
mset.mu.RLock()
Expand Down