From e18a2781969fb783f40b2088cfdc7769988d7f4a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 9 Sep 2021 11:32:52 -0700 Subject: [PATCH 1/2] Added ability to check if no message exists as a test to store the message. Signed-off-by: Derek Collison --- server/jetstream_test.go | 49 ++++++++++++++++++++++++++++++++++++++++ server/stream.go | 12 ++++++---- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 39ad4c836a7..cdfd1acd17f 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -12973,6 +12973,55 @@ func TestJetStreamPerSubjectPending(t *testing.T) { } } +func TestJetStreamPublishExpectNoMsg(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "KV", + Subjects: []string{"KV.>"}, + MaxMsgsPerSubject: 5, + }) + if err != nil { + t.Fatalf("add stream failed: %s", err) + } + + if _, err = js.Publish("KV.22", []byte("hello world")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // This should succeed. + m := nats.NewMsg("KV.33") + m.Header.Set(JSExpectedLastSubjSeq, "0") + if _, err := js.PublishMsg(m); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // This should fail. + m = nats.NewMsg("KV.22") + m.Header.Set(JSExpectedLastSubjSeq, "0") + if _, err := js.PublishMsg(m); err == nil { + t.Fatalf("Expected error: %v", err) + } + + if err := js.PurgeStream("KV"); err != nil { + t.Fatalf("Unexpected purge error: %v", err) + } + + // This should succeed now. + if _, err := js.PublishMsg(m); err != nil { + t.Fatalf("Unexpected error: %v", err) + } +} + /////////////////////////////////////////////////////////////////////////// // Simple JetStream Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/stream.go b/server/stream.go index fee0615c185..da1cc17b90b 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2465,12 +2465,12 @@ func getExpectedLastSeq(hdr []byte) uint64 { } // Fast lookup of expected stream sequence per subject. -func getExpectedLastSeqPerSubject(hdr []byte) uint64 { +func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) { bseq := getHeader(JSExpectedLastSubjSeq, hdr) if len(bseq) == 0 { - return 0 + return 0, false } - return uint64(parseInt64(bseq)) + return uint64(parseInt64(bseq)), true } // Lock should be held. @@ -2686,9 +2686,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, return fmt.Errorf("last msgid mismatch: %q vs %q", lmsgId, last) } // Expected last sequence per subject. - if seq := getExpectedLastSeqPerSubject(hdr); seq > 0 { + if seq, exists := getExpectedLastSeqPerSubject(hdr); exists { // TODO(dlc) - We could make a new store func that does this all in one. _, lseq, _, _, _, err := mset.store.LoadLastMsg(subject) + // If seq passed in is zero that signals we expect no msg to be present. + if err == ErrStoreMsgNotFound && seq == 0 { + lseq, err = 0, nil + } if err != nil || lseq != seq { mset.clfs++ mset.mu.Unlock() From c56c5acd644efdef9719c52a901f48bad958a185 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 9 Sep 2021 12:23:00 -0700 Subject: [PATCH 2/2] Only supply expected last header is seq != 0 Signed-off-by: Derek Collison --- server/mqtt.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/server/mqtt.go b/server/mqtt.go index 1b62bbcbc44..9f6e9d43773 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -2139,17 +2139,21 @@ func (sess *mqttSession) save() error { seq := sess.seq sess.mu.Unlock() - bb := bytes.Buffer{} - bb.WriteString(hdrLine) - bb.WriteString(JSExpectedLastSubjSeq) - bb.WriteString(":") - bb.WriteString(strconv.FormatInt(int64(seq), 10)) - bb.WriteString(CR_LF) - bb.WriteString(CR_LF) - hdr := bb.Len() - bb.Write(b) - - resp, err := sess.jsa.storeMsgWithKind(mqttJSASessPersist, subject, hdr, bb.Bytes()) + var hdr int + if seq != 0 { + bb := bytes.Buffer{} + bb.WriteString(hdrLine) + bb.WriteString(JSExpectedLastSubjSeq) + bb.WriteString(":") + bb.WriteString(strconv.FormatInt(int64(seq), 10)) + bb.WriteString(CR_LF) + bb.WriteString(CR_LF) + hdr = bb.Len() + bb.Write(b) + b = bb.Bytes() + } + + resp, err := sess.jsa.storeMsgWithKind(mqttJSASessPersist, subject, hdr, b) if err != nil { return err }