Skip to content

Commit

Permalink
Release v2.10.8 cherry picks (#4874)
Browse files Browse the repository at this point in the history
Includes the following changes:

- #4861
- #4862
- #4870
- #4869
- #4864
- #4868
  • Loading branch information
wallyqs authored Dec 11, 2023
2 parents fa8464d + 53368e9 commit 064b7b2
Show file tree
Hide file tree
Showing 18 changed files with 368 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/MQTT_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
path: src/github.com/nats-io/nats-server

- name: Setup Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: ${{matrix.go}}

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
path: src/github.com/nats-io/nats-server

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: "1.21.x"

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/go-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
path: src/github.com/nats-io/nats-server

- name: Setup Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: ${{matrix.go}}

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stale-issues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/stale@v8
- uses: actions/stale@v9
with:
debug-only: true # Set until the behavior is tuned.
days-before-stale: 56 # Mark stale after 8 weeks (56 days) of inactivity
Expand Down
4 changes: 2 additions & 2 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2424,10 +2424,10 @@ func TestClientLimits(t *testing.T) {
}
c.applyAccountLimits()
if c.mpay != test.expect {
t.Fatalf("payload %d not as ecpected %d", c.mpay, test.expect)
t.Fatalf("payload %d not as expected %d", c.mpay, test.expect)
}
if c.msubs != test.expect {
t.Fatalf("subscriber %d not as ecpected %d", c.msubs, test.expect)
t.Fatalf("subscriber %d not as expected %d", c.msubs, test.expect)
}
})
}
Expand Down
32 changes: 19 additions & 13 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,13 @@ type consumer struct {

// A single subject filter.
type subjectFilter struct {
subject string
nextSeq uint64
currentSeq uint64
pmsg *jsPubMsg
err error
hasWildcard bool
subject string
nextSeq uint64
currentSeq uint64
pmsg *jsPubMsg
err error
hasWildcard bool
tokenizedSubject []string
}

type subjectFilters []*subjectFilter
Expand Down Expand Up @@ -936,8 +937,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
subjects := gatherSubjectFilters(o.cfg.FilterSubject, o.cfg.FilterSubjects)
for _, filter := range subjects {
sub := &subjectFilter{
subject: filter,
hasWildcard: subjectHasWildcard(filter),
subject: filter,
hasWildcard: subjectHasWildcard(filter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, filter),
}
o.subjf = append(o.subjf, sub)
}
Expand Down Expand Up @@ -1858,8 +1860,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
newSubjf := make(subjectFilters, 0, len(newSubjects))
for _, newFilter := range newSubjects {
fs := &subjectFilter{
subject: newFilter,
hasWildcard: subjectHasWildcard(newFilter),
subject: newFilter,
hasWildcard: subjectHasWildcard(newFilter),
tokenizedSubject: tokenizeSubjectIntoSlice(nil, newFilter),
}
// If given subject was present, we will retain its fields values
// so `getNextMgs` can take advantage of already buffered `pmsgs`.
Expand Down Expand Up @@ -3361,9 +3364,10 @@ func (o *consumer) isFilteredMatch(subj string) bool {
}
// It's quicker to first check for non-wildcard filters, then
// iterate again to check for subset match.
// TODO(dlc) at speed might be better to just do a sublist with L2 and/or possibly L1.
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
for _, filter := range o.subjf {
if subjectIsSubsetMatch(subj, filter.subject) {
if isSubsetMatchTokenized(tts, filter.tokenizedSubject) {
return true
}
}
Expand Down Expand Up @@ -3945,8 +3949,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
}
} else {
if o.subjf != nil {
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], pmsg.subj)
for i, filter := range o.subjf {
if subjectIsSubsetMatch(pmsg.subj, filter.subject) {
if isSubsetMatchTokenized(tts, filter.tokenizedSubject) {
o.subjf[i].currentSeq--
o.subjf[i].nextSeq--
break
Expand Down
32 changes: 16 additions & 16 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,7 @@ func TestFileStoreMeta(t *testing.T) {
oname := "obs22"
obs, err := fs.ConsumerStore(oname, &oconfig)
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}

ometafile := filepath.Join(fcfg.StoreDir, consumerDir, oname, JetStreamMetaFile)
Expand Down Expand Up @@ -1740,11 +1740,11 @@ func TestFileStoreSnapshot(t *testing.T) {
// Create a few consumers.
o1, err := fs.ConsumerStore("o22", &ConsumerConfig{})
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
o2, err := fs.ConsumerStore("o33", &ConsumerConfig{})
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
state := &ConsumerState{}
state.Delivered.Consumer = 100
Expand All @@ -1753,13 +1753,13 @@ func TestFileStoreSnapshot(t *testing.T) {
state.AckFloor.Stream = 22

if err := o1.Update(state); err != nil {
t.Fatalf("Unexepected error updating state: %v", err)
t.Fatalf("Unexpected error updating state: %v", err)
}
state.AckFloor.Consumer = 33
state.AckFloor.Stream = 33

if err := o2.Update(state); err != nil {
t.Fatalf("Unexepected error updating state: %v", err)
t.Fatalf("Unexpected error updating state: %v", err)
}

snapshot := func() []byte {
Expand Down Expand Up @@ -1908,7 +1908,7 @@ func TestFileStoreConsumer(t *testing.T) {

o, err := fs.ConsumerStore("obs22", &ConsumerConfig{})
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
if state, err := o.State(); err != nil || state.Delivered.Consumer != 0 {
t.Fatalf("Unexpected state or error: %v", err)
Expand All @@ -1919,11 +1919,11 @@ func TestFileStoreConsumer(t *testing.T) {
updateAndCheck := func() {
t.Helper()
if err := o.Update(state); err != nil {
t.Fatalf("Unexepected error updating state: %v", err)
t.Fatalf("Unexpected error updating state: %v", err)
}
s2, err := o.State()
if err != nil {
t.Fatalf("Unexepected error getting state: %v", err)
t.Fatalf("Unexpected error getting state: %v", err)
}
if !reflect.DeepEqual(state, s2) {
t.Fatalf("State is not the same: wanted %+v got %+v", state, s2)
Expand Down Expand Up @@ -2435,7 +2435,7 @@ func TestFileStoreConsumerRedeliveredLost(t *testing.T) {
cfg := &ConsumerConfig{AckPolicy: AckExplicit}
o, err := fs.ConsumerStore("o22", cfg)
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}

restartConsumer := func() {
Expand All @@ -2444,12 +2444,12 @@ func TestFileStoreConsumerRedeliveredLost(t *testing.T) {
time.Sleep(20 * time.Millisecond) // Wait for all things to settle.
o, err = fs.ConsumerStore("o22", cfg)
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
// Make sure we recovered Redelivered.
state, err := o.State()
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
if state == nil {
t.Fatalf("Did not recover state")
Expand Down Expand Up @@ -2500,7 +2500,7 @@ func TestFileStoreConsumerFlusher(t *testing.T) {

o, err := fs.ConsumerStore("o22", &ConsumerConfig{})
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
// Get the underlying impl.
oc := o.(*consumerFileStore)
Expand Down Expand Up @@ -2532,7 +2532,7 @@ func TestFileStoreConsumerDeliveredUpdates(t *testing.T) {
// Simple consumer, no ack policy configured.
o, err := fs.ConsumerStore("o22", &ConsumerConfig{})
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
defer o.Stop()

Expand Down Expand Up @@ -2586,7 +2586,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) {
// Simple consumer, no ack policy configured.
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
defer o.Stop()

Expand Down Expand Up @@ -2676,7 +2676,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) {

o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
defer o.Stop()

Expand Down Expand Up @@ -2778,7 +2778,7 @@ func TestFileStoreConsumerPerf(t *testing.T) {

o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
if err != nil {
t.Fatalf("Unexepected error: %v", err)
t.Fatalf("Unexpected error: %v", err)
}
// Get the underlying impl.
oc := o.(*consumerFileStore)
Expand Down
37 changes: 17 additions & 20 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2086,7 +2086,7 @@ func (jsa *jsAccount) storageTotals() (uint64, uint64) {
return mem, store
}

func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string) (bool, *ApiError) {
func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string, replicas int) (bool, *ApiError) {
jsa.usageMu.RLock()
defer jsa.usageMu.RUnlock()

Expand All @@ -2099,20 +2099,25 @@ func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string) (bo
// Imply totals of 0
return false, nil
}
r := int64(replicas)
if r < 1 || tierName == _EMPTY_ {
r = 1
}
// Since tiers are flat we need to scale limit up by replicas when checking.
if storeType == MemoryStorage {
totalMem := inUse.total.mem
if selectedLimits.MemoryMaxStreamBytes > 0 && totalMem > selectedLimits.MemoryMaxStreamBytes {
if selectedLimits.MemoryMaxStreamBytes > 0 && totalMem > selectedLimits.MemoryMaxStreamBytes*r {
return true, nil
}
if selectedLimits.MaxMemory >= 0 && totalMem > selectedLimits.MaxMemory {
if selectedLimits.MaxMemory >= 0 && totalMem > selectedLimits.MaxMemory*r {
return true, nil
}
} else {
totalStore := inUse.total.store
if selectedLimits.StoreMaxStreamBytes > 0 && totalStore > selectedLimits.StoreMaxStreamBytes {
if selectedLimits.StoreMaxStreamBytes > 0 && totalStore > selectedLimits.StoreMaxStreamBytes*r {
return true, nil
}
if selectedLimits.MaxStore >= 0 && totalStore > selectedLimits.MaxStore {
if selectedLimits.MaxStore >= 0 && totalStore > selectedLimits.MaxStore*r {
return true, nil
}
}
Expand Down Expand Up @@ -2141,39 +2146,31 @@ func (js *jetStream) checkLimits(selected *JetStreamAccountLimits, config *Strea
}
// stream limit is checked separately on stream create only!
// Check storage, memory or disk.
return js.checkBytesLimits(selected, config.MaxBytes, config.Storage, config.Replicas, checkServer, currentRes, maxBytesOffset)
return js.checkBytesLimits(selected, config.MaxBytes, config.Storage, checkServer, currentRes, maxBytesOffset)
}

// Check if additional bytes will exceed our account limits and optionally the server itself.
// This should account for replicas.
// Read Lock should be held.
func (js *jetStream) checkBytesLimits(selectedLimits *JetStreamAccountLimits, addBytes int64, storage StorageType, replicas int, checkServer bool, currentRes, maxBytesOffset int64) error {
if replicas < 1 {
replicas = 1
}
func (js *jetStream) checkBytesLimits(selectedLimits *JetStreamAccountLimits, addBytes int64, storage StorageType, checkServer bool, currentRes, maxBytesOffset int64) error {
if addBytes < 0 {
addBytes = 1
}
totalBytes := (addBytes * int64(replicas)) + maxBytesOffset
totalBytes := addBytes + maxBytesOffset

switch storage {
case MemoryStorage:
// Account limits defined.
if selectedLimits.MaxMemory >= 0 {
if currentRes+totalBytes > selectedLimits.MaxMemory {
return NewJSMemoryResourcesExceededError()
}
if selectedLimits.MaxMemory >= 0 && currentRes+totalBytes > selectedLimits.MaxMemory {
return NewJSMemoryResourcesExceededError()
}
// Check if this server can handle request.
if checkServer && js.memReserved+addBytes > js.config.MaxMemory {
return NewJSMemoryResourcesExceededError()
}
case FileStorage:
// Account limits defined.
if selectedLimits.MaxStore >= 0 {
if currentRes+totalBytes > selectedLimits.MaxStore {
return NewJSStorageResourcesExceededError()
}
if selectedLimits.MaxStore >= 0 && currentRes+totalBytes > selectedLimits.MaxStore {
return NewJSStorageResourcesExceededError()
}
// Check if this server can handle request.
if checkServer && js.storeReserved+addBytes > js.config.MaxStore {
Expand Down
Loading

0 comments on commit 064b7b2

Please sign in to comment.