Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Check stream state performance #5963

Merged
merged 5 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 39 additions & 32 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ type consumer struct {
dseq uint64 // delivered consumer sequence
adflr uint64 // ack delivery floor
asflr uint64 // ack store floor
chkflr uint64 // our check floor, interest streams only.
npc int64 // Num Pending Count
npf uint64 // Num Pending Floor Sequence
dsubj string
Expand Down Expand Up @@ -3033,28 +3034,6 @@ func (o *consumer) isFiltered() bool {
return false
}

// Check if we would have matched and needed an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) matchAck(sseq uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()

// Check if we are filtered, and if so check if this is even applicable to us.
if o.isFiltered() {
if o.mset == nil {
return false
}
var svp StoreMsg
if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil {
return false
}
if !o.isFilteredMatch(svp.subj) {
return false
}
}
return true
}

// Check if we need an ack for this store seq.
// This is called for interest based retention streams to remove messages.
func (o *consumer) needAck(sseq uint64, subj string) bool {
Expand Down Expand Up @@ -5637,16 +5616,24 @@ func (o *consumer) isMonitorRunning() bool {
var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence")

// If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent.
func (o *consumer) checkStateForInterestStream() error {
func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
o.mu.RLock()
// See if we need to process this update if our parent stream is not a limits policy stream.
mset := o.mset
shouldProcessState := mset != nil && o.retention != LimitsPolicy
if o.closed || !shouldProcessState || o.store == nil {
if o.closed || !shouldProcessState || o.store == nil || ss == nil {
o.mu.RUnlock()
return nil
}
store := mset.store
state, err := o.store.State()

filters, subjf, filter := o.filters, o.subjf, _EMPTY_
var wc bool
if filters == nil && subjf != nil {
filter, wc = subjf[0].subject, subjf[0].hasWildcard
}
chkfloor := o.chkflr
o.mu.RUnlock()

if err != nil {
Expand All @@ -5659,26 +5646,46 @@ func (o *consumer) checkStateForInterestStream() error {
return nil
}

// We should make sure to update the acks.
var ss StreamState
mset.store.FastState(&ss)

// Check if the underlying stream's last sequence is less than our floor.
// This can happen if the stream has been reset and has not caught up yet.
if asflr > ss.LastSeq {
return errAckFloorHigherThanLastSeq
}

for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
if o.matchAck(seq) {
var smv StoreMsg
var seq, nseq uint64
// Start at first stream seq or a previous check floor, whichever is higher.
// Note this will really help for interest retention, with WQ the loadNextMsg
// gets us a long way already since it will skip deleted msgs not for our filter.
fseq := ss.FirstSeq
if chkfloor > fseq {
fseq = chkfloor
}

for seq = fseq; asflr > 0 && seq <= asflr; seq++ {
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv)
} else {
_, nseq, err = store.LoadNextMsg(filter, wc, seq, &smv)
}
// if we advanced sequence update our seq. This can be on no error and EOF.
if nseq > seq {
seq = nseq
}
// Only ack though if no error and seq <= ack floor.
if err == nil && seq <= asflr {
mset.ackMsg(o, seq)
}
}

o.mu.RLock()
o.mu.Lock()
// Update our check floor.
if seq > o.chkflr {
o.chkflr = seq
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
o.mu.RUnlock()
o.mu.Unlock()

// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if state != nil && len(state.Pending) > 0 && state.AckFloor.Stream > 0 {
Expand Down
30 changes: 19 additions & 11 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2586,7 +2586,7 @@ func (fs *fileStore) numFilteredPendingNoLast(filter string, ss *SimpleState) {

// Optimized way for getting all num pending matching a filter subject.
// Optionally look up last sequence. Sometimes do not need last and this avoids cost.
// Lock should be held.
// Read lock should be held.
func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *SimpleState) {
isAll := filter == _EMPTY_ || filter == fwcs

Expand Down Expand Up @@ -2653,17 +2653,25 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
}
}
// Update fblk since fblk was outdated.
if !wc {
if info, ok := fs.psim.Find(stringToBytes(filter)); ok {
info.fblk = i
}
} else {
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
// We only require read lock here as that is desirable,
// so we need to do this in a go routine to acquire write lock.
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to guard against accidentally creating multiple of these goroutines for the same store?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, but was thinking same thing.

We essentially have three options.

  1. Promote to write lock at call sites, but I want LoadNextMsg() to be able to operate in parallel.
  2. Do not do any fixups in this function to stale fblks.
  3. The proposal above (we could modify it to funnel through a single Go routine, but should not be common)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if a simple CompareAndSwap and a deferred clear would be enough just to ensure only one fixup runs at a time for a given store. I'd be worried that we could end up with multiple of these doing the same work at the same time, which could compound the issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be for a different set of PSIM entries though so a simple boolean state would not suffice IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we could go with this for now and keep an eye on the goroutines. It may not be an issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think worse case is we could duplicate work but not invalidate state. Hence the checks to only move it forward.

fs.mu.Lock()
defer fs.mu.Unlock()
if !wc {
if info, ok := fs.psim.Find(stringToBytes(filter)); ok {
if i > info.fblk {
info.fblk = i
}
}
})
}
} else {
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
}
})
}
}()
}
// Now gather last sequence if asked to do so.
if last {
Expand Down
109 changes: 72 additions & 37 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7143,14 +7143,21 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
require_Equal(t, ss.First, 1002)
require_Equal(t, ss.Last, 1003)

// Check psi was updated.
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.baz"))
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 84)
require_Equal(t, psi.lblk, 84)
// Check psi was updated. This is done in separate go routine to acquire
// the write lock now.
checkFor(t, time.Second, 100*time.Millisecond, func() error {
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.baz"))
total, fblk, lblk := psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, total, 2)
require_Equal(t, lblk, 84)
if fblk != 84 {
return fmt.Errorf("fblk should be 84, still %d", fblk)
}
return nil
})
}

func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
Expand Down Expand Up @@ -7187,19 +7194,21 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
require_Equal(t, fs.numMsgBlocks(), 92)
fs.mu.RLock()
psi, ok := fs.psim.Find([]byte("foo.22.baz"))
total, fblk, lblk := psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 1)
require_Equal(t, psi.lblk, 92)
require_Equal(t, total, 2)
require_Equal(t, fblk, 1)
require_Equal(t, lblk, 92)

fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.bar"))
total, fblk, lblk = psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 1)
require_Equal(t, psi.lblk, 92)
require_Equal(t, total, 2)
require_Equal(t, fblk, 1)
require_Equal(t, lblk, 92)

// No make sure that a call to numFilterPending which will initially walk all blocks if starting from seq 1 updates psi.
var ss SimpleState
Expand All @@ -7211,21 +7220,33 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) {
require_Equal(t, ss.Last, 1006)

// Check both psi were updated.
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.baz"))
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 92)
require_Equal(t, psi.lblk, 92)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.baz"))
total, fblk, lblk = psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, total, 2)
require_Equal(t, lblk, 92)
if fblk != 92 {
return fmt.Errorf("fblk should be 92, still %d", fblk)
}
return nil
})

fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.bar"))
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, psi.total, 2)
require_Equal(t, psi.fblk, 92)
require_Equal(t, psi.lblk, 92)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
fs.mu.RLock()
psi, ok = fs.psim.Find([]byte("foo.22.bar"))
total, fblk, lblk = psi.total, psi.fblk, psi.lblk
fs.mu.RUnlock()
require_True(t, ok)
require_Equal(t, total, 2)
require_Equal(t, fblk, 92)
if fblk != 92 {
return fmt.Errorf("fblk should be 92, still %d", fblk)
}
return nil
})
}

// Make sure if we only miss by one for fblk that we still update it.
Expand All @@ -7248,10 +7269,14 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
fetch := func(subj string) *psi {
t.Helper()
fs.mu.RLock()
var info psi
psi, ok := fs.psim.Find([]byte(subj))
if ok && psi != nil {
info = *psi
}
fs.mu.RUnlock()
require_True(t, ok)
return psi
return &info
}

psi := fetch("foo.22.bar")
Expand All @@ -7274,10 +7299,15 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
require_Equal(t, ss.Last, 7)

// Now make sure that we properly updated the psim entry.
psi = fetch("foo.22.bar")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.fblk, 2)
require_Equal(t, psi.lblk, 4)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
psi = fetch("foo.22.bar")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.lblk, 4)
if psi.fblk != 2 {
return fmt.Errorf("fblk should be 2, still %d", psi.fblk)
}
return nil
})

// Now make sure wildcard calls into also update blks.
// First remove first "foo.22.baz" which will remove first block.
Expand All @@ -7300,10 +7330,15 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) {
require_Equal(t, ss.First, 4)
require_Equal(t, ss.Last, 8)

psi = fetch("foo.22.baz")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.fblk, 2)
require_Equal(t, psi.lblk, 4)
checkFor(t, time.Second, 100*time.Millisecond, func() error {
psi = fetch("foo.22.baz")
require_Equal(t, psi.total, 3)
require_Equal(t, psi.lblk, 4)
if psi.fblk != 2 {
return fmt.Errorf("fblk should be 2, still %d", psi.fblk)
}
return nil
})
}

func TestFileStoreLargeSparseMsgsDoNotLoadAfterLast(t *testing.T) {
Expand Down
Loading