Skip to content

Commit

Permalink
Cherry-picks for 2.10.25-RC.3 (#6384)
Browse files Browse the repository at this point in the history
Includes the following:

- #6373
- #6377
- #6379
- #6390
- #6392
- #6387
- #6292

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Jan 22, 2025
2 parents 690079b + 8e94003 commit d198cac
Show file tree
Hide file tree
Showing 14 changed files with 707 additions and 59 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.23.4"
- "1.22.10"
- "1.23.5"
- "1.22.11"

go_import_path: github.com/nats-io/nats-server

Expand Down
29 changes: 18 additions & 11 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,8 @@ func (o *consumer) setLeader(isLeader bool) {
// If we were the leader make sure to drain queued up acks.
if wasLeader {
o.ackMsgs.drain()
// Reset amount of acks that need to be processed.
atomic.StoreInt64(&o.awl, 0)
// Also remove any pending replies since we should not be the one to respond at this point.
o.replies = nil
}
Expand Down Expand Up @@ -2860,7 +2862,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
// no-op
if dseq <= o.adflr || sseq <= o.asflr {
o.mu.Unlock()
return ackInPlace
// Return true to let caller respond back to the client.
return true
}
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
Expand Down Expand Up @@ -3600,17 +3603,21 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
}
continue
}
if seq > 0 {
pmsg := getJSPubMsgFromPool()
sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg)
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
return pmsg, dc, err
pmsg := getJSPubMsgFromPool()
sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg)
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
// Message was scheduled for redelivery but was removed in the meantime.
if err == ErrStoreMsgNotFound || err == errDeletedMsg {
delete(o.pending, seq)
delete(o.rdc, seq)
continue
}
return pmsg, dc, err
}
}

Expand Down
71 changes: 53 additions & 18 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim

// Do age checks too, make sure to call in place.
if fs.cfg.MaxAge != 0 {
fs.expireMsgsOnRecover()
err := fs.expireMsgsOnRecover()
if isPermissionError(err) {
return nil, err
}
fs.startAgeChk()
}

Expand Down Expand Up @@ -1978,9 +1981,9 @@ func (fs *fileStore) recoverMsgs() error {
// We will treat this differently in case we have a recovery
// that will expire alot of messages on startup.
// Should only be called on startup.
func (fs *fileStore) expireMsgsOnRecover() {
func (fs *fileStore) expireMsgsOnRecover() error {
if fs.state.Msgs == 0 {
return
return nil
}

var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge)
Expand All @@ -1992,7 +1995,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
// usually taken care of by fs.removeMsgBlock() but we do not call that here.
var last msgId

deleteEmptyBlock := func(mb *msgBlock) {
deleteEmptyBlock := func(mb *msgBlock) error {
// If we are the last keep state to remember first/last sequence.
// Do this part by hand since not deleting one by one.
if mb == fs.lmb {
Expand All @@ -2008,8 +2011,12 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
return true
})
mb.dirtyCloseWithRemove(true)
err := mb.dirtyCloseWithRemove(true)
if isPermissionError(err) {
return err
}
deleted++
return nil
}

for _, mb := range fs.blks {
Expand All @@ -2023,8 +2030,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb.last.ts <= minAge {
purged += mb.msgs
bytes += mb.bytes
deleteEmptyBlock(mb)
err := deleteEmptyBlock(mb)
mb.mu.Unlock()
if isPermissionError(err) {
return err
}
continue
}

Expand Down Expand Up @@ -2148,6 +2158,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
if purged > 0 {
fs.dirty++
}
return nil
}

func copyMsgBlocks(src []*msgBlock) []*msgBlock {
Expand Down Expand Up @@ -3463,6 +3474,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
dios <- struct{}{}

if err != nil {
if isPermissionError(err) {
return nil, err
}
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file: %v", err)
}
Expand Down Expand Up @@ -5469,16 +5483,23 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
<-dios
tmpFD, err := os.OpenFile(tmpFN, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, defaultFilePerms)
dios <- struct{}{}

if err != nil {
return fmt.Errorf("failed to create temporary file: %w", err)
}

errorCleanup := func(err error) error {
tmpFD.Close()
os.Remove(tmpFN)
return err
}

// The original buffer at this point is uncompressed, so we will now compress
// it if needed. Note that if the selected algorithm is NoCompression, the
// Compress function will just return the input buffer unmodified.
cmpBuf, err := alg.Compress(origBuf)
if err != nil {
return fmt.Errorf("failed to compress block: %w", err)
return errorCleanup(fmt.Errorf("failed to compress block: %w", err))
}

// We only need to write out the metadata header if compression is enabled.
Expand All @@ -5496,19 +5517,14 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
if mb.bek != nil && len(cmpBuf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return err
return errorCleanup(err)
}
mb.bek = bek
mb.bek.XORKeyStream(cmpBuf, cmpBuf)
}

// Write the new block data (which might be compressed or encrypted) to the
// temporary file.
errorCleanup := func(err error) error {
tmpFD.Close()
os.Remove(tmpFN)
return err
}
if n, err := tmpFD.Write(cmpBuf); err != nil {
return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err))
} else if n != len(cmpBuf) {
Expand Down Expand Up @@ -7779,9 +7795,9 @@ func (mb *msgBlock) dirtyClose() {
}

// Should be called with lock held.
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error {
if mb == nil {
return
return nil
}
// Stop cache expiration timer.
if mb.ctmr != nil {
Expand All @@ -7803,13 +7819,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
// Clear any tracking by subject if we are removing.
mb.fss = nil
if mb.mfn != _EMPTY_ {
os.Remove(mb.mfn)
err := os.Remove(mb.mfn)
if isPermissionError(err) {
return err
}
mb.mfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
os.Remove(mb.kfn)
err := os.Remove(mb.kfn)
if isPermissionError(err) {
return err
}
}
}
return nil
}

// Remove a seq from the fss and select new first.
Expand Down Expand Up @@ -8222,7 +8245,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
for {
select {
case <-t.C:
fs.writeFullState()
err := fs.writeFullState()
if isPermissionError(err) && fs.srv != nil {
fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return
}

case <-qch:
return
}
Expand Down Expand Up @@ -8430,7 +8461,11 @@ func (fs *fileStore) _writeFullState(force bool) error {
// Protect with dios.
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable isPermissionError is set to true
dios <- struct{}{}
if isPermissionError(err) {
return err
}

// Update dirty if successful.
if err == nil {
Expand Down
81 changes: 80 additions & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math/bits"
"math/rand"
"os"
Expand Down Expand Up @@ -143,9 +144,9 @@ func TestFileStoreBasics(t *testing.T) {
func TestFileStoreMsgHeaders(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)

require_NoError(t, err)
defer fs.Stop()

subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World")
elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8
if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen {
Expand Down Expand Up @@ -8194,3 +8195,81 @@ func TestFileStoreNumPendingMulti(t *testing.T) {
}
require_Equal(t, total, checkTotal)
}

func TestFileStoreStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE, _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
changeDirectoryPermission(directory, READONLY_MODE)
require_NoError(t, err)
totalMsgs := 10000
i := 0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg)
if err != nil {
break
}
}
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func TestFileStoreWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE, _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
require_NoError(t, err)
totalMsgs := 10000
i := 0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg)
if err != nil {
break
}
}
changeDirectoryPermission(directory, READONLY_MODE)
err = fs.writeFullState()
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func changeDirectoryPermission(directory string, mode fs.FileMode) error {
err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing path %q: %w", path, err)
}

// Check if the path is a directory or file and set permissions accordingly
if info.IsDir() {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing directory permissions for %q: %w", path, err)
}
} else {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing file permissions for %q: %w", path, err)
}
}
return nil
})
return err
}
7 changes: 5 additions & 2 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,16 @@ func (q *ipQueue[T]) len() int {
}

// Empty the queue and consumes the notification signal if present.
// Returns the number of items that were drained from the queue.
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
func (q *ipQueue[T]) drain() {
func (q *ipQueue[T]) drain() int {
if q == nil {
return
return 0
}
q.Lock()
olen := len(q.elts)
if q.elts != nil {
q.resetAndReturnToPool(&q.elts)
q.elts, q.pos = nil, 0
Expand All @@ -209,6 +211,7 @@ func (q *ipQueue[T]) drain() {
default:
}
q.Unlock()
return olen
}

// Since the length of the queue goes to 0 after a pop(), it is good to
Expand Down
11 changes: 11 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2974,3 +2974,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) {
cfg.Duplicates = 0
}
}

func (s *Server) handleWritePermissionError() {
//TODO Check if we should add s.jetStreamOOSPending in condition
if s.JetStreamEnabled() {
s.Errorf("File system permission denied while writing, disabling JetStream")

go s.DisableJetStream()

//TODO Send respective advisory if needed, same as in handleOutOfSpace
}
}
Loading

0 comments on commit d198cac

Please sign in to comment.