diff --git a/server/consumer.go b/server/consumer.go index c72b3d860af..71e3e68e7f8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -718,7 +718,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri mset.mu.RLock() s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc - retention := cfg.Retention mset.mu.RUnlock() // If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn. @@ -741,7 +740,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // Make sure we have sane defaults. Do so with the JS lock, otherwise a // badly timed meta snapshot can result in a race condition. mset.js.mu.Lock() - setConsumerConfigDefaults(config, &mset.cfg, srvLim, selectedLimits) + setConsumerConfigDefaults(config, &cfg, srvLim, selectedLimits) mset.js.mu.Unlock() if err := checkConsumerCfg(config, srvLim, &cfg, acc, selectedLimits, isRecovering); err != nil { @@ -787,7 +786,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerAlreadyExistsError() } // Check for overlapping subjects if we are a workqueue - if mset.cfg.Retention == WorkQueuePolicy { + if cfg.Retention == WorkQueuePolicy { subjects := gatherSubjectFilters(config.FilterSubject, config.FilterSubjects) if !mset.partitionUnique(cName, subjects) { return nil, NewJSConsumerWQConsumerNotUniqueError() @@ -809,7 +808,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // but if not we use the value from account limits, if account limits is more restrictive // than stream config we prefer the account limits to handle cases where account limits are // updated during the lifecycle of the stream - maxc := mset.cfg.MaxConsumers + maxc := cfg.MaxConsumers if maxc <= 0 || (selectedLimits.MaxConsumers > 0 && selectedLimits.MaxConsumers < maxc) { maxc = selectedLimits.MaxConsumers } @@ -819,7 +818,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } // Check on stream type conflicts with WorkQueues. - if mset.cfg.Retention == WorkQueuePolicy && !config.Direct { + if cfg.Retention == WorkQueuePolicy && !config.Direct { // Force explicit acks here. if config.AckPolicy != AckExplicit { mset.mu.Unlock() @@ -877,7 +876,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri sfreq: int32(sampleFreq), maxdc: uint64(config.MaxDeliver), maxp: config.MaxAckPending, - retention: retention, + retention: cfg.Retention, created: time.Now().UTC(), } @@ -910,17 +909,17 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } // Create ackMsgs queue now that we have a consumer name - o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name)) + o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, cfg.Name)) // Create our request waiting queue. if o.isPullMode() { o.waiting = newWaitQueue(config.MaxWaiting) // Create our internal queue for next msg requests. - o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, mset.cfg.Name)) + o.nextMsgReqs = newIPQueue[*nextMsgReq](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' pull requests", accName, o.name, cfg.Name)) } // already under lock, mset.Name() would deadlock - o.stream = mset.cfg.Name + o.stream = cfg.Name o.ackEventT = JSMetricConsumerAckPre + "." + o.stream + "." + o.name o.nakEventT = JSAdvisoryConsumerMsgNakPre + "." + o.stream + "." + o.name o.deliveryExcEventT = JSAdvisoryConsumerMaxDeliveryExceedPre + "." + o.stream + "." + o.name @@ -1005,7 +1004,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // that to scanf them back in. // Escape '%' in consumer and stream names, as `pre` is used as a template later // in consumer.ackReply(), resulting in erroneous formatting of the ack subject. - mn := strings.ReplaceAll(mset.cfg.Name, "%", "%%") + mn := strings.ReplaceAll(cfg.Name, "%", "%%") pre := fmt.Sprintf(jsAckT, mn, strings.ReplaceAll(o.name, "%", "%%")) o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre) o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre) @@ -1017,7 +1016,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if o.isPushMode() { // Check if we are running only 1 replica and that the delivery subject has interest. // Check in place here for interest. Will setup properly in setLeader. - if config.replicas(&mset.cfg) == 1 { + if config.replicas(&cfg) == 1 { interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject) if !o.hasDeliveryInterest(interest) { // Let the interest come to us eventually, but setup delete timer. @@ -1227,7 +1226,7 @@ func (o *consumer) setLeader(isLeader bool) { } mset.mu.RLock() - s, jsa, stream, lseq := mset.srv, mset.jsa, mset.cfg.Name, mset.lseq + s, jsa, stream, lseq := mset.srv, mset.jsa, mset.getCfgName(), mset.lseq mset.mu.RUnlock() o.mu.Lock() @@ -1809,6 +1808,8 @@ func (o *consumer) setRateLimit(bps uint64) { // Burst should be set to maximum msg size for this account, etc. var burst int + // We don't need to get cfgMu's rlock here since this function + // is already invoked under mset.mu.RLock(), which superseeds cfgMu. if mset.cfg.MaxMsgSize > 0 { burst = int(mset.cfg.MaxMsgSize) } else if mset.jsa.account.limits.mpay > 0 { @@ -2551,7 +2552,7 @@ func (o *consumer) checkRedelivered(slseq uint64) { if shouldUpdateState { if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err) + s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.getCfgName(), name, err) } } } @@ -2975,16 +2976,21 @@ func (o *consumer) isFiltered() bool { return true } + // Protect access to mset.cfg with the cfgMu mutex. + mset.cfgMu.RLock() + msetSubjects := mset.cfg.Subjects + mset.cfgMu.RUnlock() + // `isFiltered` need to be performant, so we do // as any checks as possible to avoid unnecessary work. // Here we avoid iteration over slices if there is only one subject in stream // and one filter for the consumer. - if len(mset.cfg.Subjects) == 1 && len(o.subjf) == 1 { - return mset.cfg.Subjects[0] != o.subjf[0].subject + if len(msetSubjects) == 1 && len(o.subjf) == 1 { + return msetSubjects[0] != o.subjf[0].subject } // if the list is not equal length, we can return early, as this is filtered. - if len(mset.cfg.Subjects) != len(o.subjf) { + if len(msetSubjects) != len(o.subjf) { return true } @@ -2996,7 +3002,7 @@ func (o *consumer) isFiltered() bool { for _, val := range o.subjf { cfilters[val.subject] = struct{}{} } - for _, val := range mset.cfg.Subjects { + for _, val := range msetSubjects { if _, ok := cfilters[val]; !ok { return true } @@ -4035,10 +4041,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { inch := o.inch o.mu.Unlock() - // Grab the stream's retention policy - mset.mu.RLock() - rp := mset.cfg.Retention - mset.mu.RUnlock() + // Grab the stream's retention policy and name + mset.cfgMu.RLock() + stream, rp := mset.cfg.Name, mset.cfg.Retention + mset.cfgMu.RUnlock() var err error @@ -4101,12 +4107,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { goto waitForMsgs } else if err == errPartialCache { s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'", - o.mset.acc, o.mset.cfg.Name, o.cfg.Name) + o.mset.acc, stream, o.cfg.Name) goto waitForMsgs } else { s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v", - o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err) + o.mset.acc, stream, o.cfg.Name, err) goto waitForMsgs } } @@ -4791,7 +4797,7 @@ func (o *consumer) checkPending() { if shouldUpdateState { if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed { s, acc, mset, name := o.srv, o.acc, o.mset, o.name - s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err) + s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.getCfgName(), name, err) } } } @@ -4912,7 +4918,10 @@ func (o *consumer) selectStartingSeqNo() { } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { // If our parent stream is set to max msgs per subject of 1 this is just // a normal consumer at this point. We can avoid any heavy lifting. - if o.mset.cfg.MaxMsgsPer == 1 { + o.mset.cfgMu.RLock() + mmp := o.mset.cfg.MaxMsgsPer + o.mset.cfgMu.RUnlock() + if mmp == 1 { o.sseq = state.FirstSeq } else { // A threshold for when we switch from get last msg to subjects state. @@ -5305,6 +5314,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { if mset != nil { mset.mu.Lock() mset.removeConsumer(o) + // No need for cfgMu's lock since mset.mu.Lock superseeds it. rp = mset.cfg.Retention mset.mu.Unlock() } diff --git a/server/events_test.go b/server/events_test.go index eea10dc2e8e..dbe8695247b 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -3607,7 +3607,7 @@ func TestClusterSetupMsgs(t *testing.T) { var totalOut int for _, server := range c.servers { - totalOut += int(server.outMsgs) + totalOut += int(atomic.LoadInt64(&server.outMsgs)) } totalExpected := numServers * numServers if totalOut >= totalExpected { diff --git a/server/stream.go b/server/stream.go index 08c3b2d9434..1fe6f87c1f8 100644 --- a/server/stream.go +++ b/server/stream.go @@ -236,6 +236,7 @@ type stream struct { consumers map[string]*consumer // The consumers for this stream. numFilter int // The number of filtered consumers. cfg StreamConfig // The stream's config. + cfgMu sync.RWMutex // Config mutex used to solve some races with consumer code created time.Time // Time the stream was created. stype StorageType // The storage type. tier string // The tier is the number of replicas for the stream (e.g. "R1" or "R3"). @@ -1978,7 +1979,14 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) } // Now update config and store's version of our config. + // Although we are under the stream write lock, we will also assign the new + // configuration under mset.cfgMu lock. This is so that in places where + // mset.mu cannot be acquired (like many cases in consumer.go where code + // is under the consumer's lock), and the stream's configuration needs to + // be inspected, one can use mset.cfgMu's read lock to do that safely. + mset.cfgMu.Lock() mset.cfg = *cfg + mset.cfgMu.Unlock() // If we're changing retention and haven't errored because of consumer // replicas by now, whip through and update the consumer retention. @@ -2029,6 +2037,15 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) return nil } +// Small helper to return the Name field from mset.cfg, protected by +// the mset.cfgMu mutex. This is simply because we have several places +// in consumer.go where we need it. +func (mset *stream) getCfgName() string { + mset.cfgMu.RLock() + defer mset.cfgMu.RUnlock() + return mset.cfg.Name +} + // Purge will remove all messages from the stream and underlying store based on the request. func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err error) { mset.mu.RLock()