diff --git a/server/jetstream_api.go b/server/jetstream_api.go index c6252c3087d..d605b98aaea 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -351,6 +351,12 @@ type JSApiStreamNamesResponse struct { const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response" +type JSApiStreamListRequest struct { + ApiPagedRequest + // These are filters that can be applied to the list. + Subject string `json:"subject,omitempty"` +} + // JSApiStreamListResponse list of detailed stream information. // A nil request is valid and means all streams. type JSApiStreamListResponse struct { @@ -1541,27 +1547,38 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s } var offset int + var filter string + if !isEmptyRequest(msg) { - var req JSApiStreamNamesRequest + var req JSApiStreamListRequest if err := json.Unmarshal(msg, &req); err != nil { resp.Error = NewJSInvalidJSONError() s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } offset = req.Offset + if req.Subject != _EMPTY_ { + filter = req.Subject + } } // Clustered mode will invoke a scatter and gather. if s.JetStreamIsClustered() { // Need to copy these off before sending.. msg = append(msg[:0:0], msg...) - s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg) }) + s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) }) return } // TODO(dlc) - Maybe hold these results for large results that we expect to be paged. // TODO(dlc) - If this list is long maybe do this in a Go routine? - msets := acc.streams() + var msets []*stream + if filter == _EMPTY_ { + msets = acc.streams() + } else { + msets = acc.filteredStreams(filter) + } + sort.Slice(msets, func(i, j int) bool { return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0 }) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4939af574e3..be52b0190b6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3836,7 +3836,7 @@ func (s *Server) allPeersOffline(rg *raftGroup) bool { // This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader. // This will be running in a separate Go routine. -func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) { +func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filter string, offset int, subject, reply string, rmsg []byte) { defer s.grWG.Done() js, cc := s.getJetStreamCluster() @@ -3848,8 +3848,29 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs var streams []*streamAssignment for _, sa := range cc.streams[acc.Name] { - streams = append(streams, sa) + if IsNatsErr(sa.err, JSClusterNotAssignedErr) { + continue + } + + if filter != _EMPTY_ { + // These could not have subjects auto-filled in since they are raw and unprocessed. + if len(sa.Config.Subjects) == 0 { + if SubjectsCollide(filter, sa.Config.Name) { + streams = append(streams, sa) + } + } else { + for _, subj := range sa.Config.Subjects { + if SubjectsCollide(filter, subj) { + streams = append(streams, sa) + break + } + } + } + } else { + streams = append(streams, sa) + } } + // Needs to be sorted for offsets etc. if len(streams) > 1 { sort.Slice(streams, func(i, j int) bool { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 5ff1b9a1091..817ca34251d 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -9442,6 +9442,63 @@ func TestJetStreamClusterAccountInfoForSystemAccount(t *testing.T) { } } +func TestJetStreamListFilter(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer removeDir(t, config.StoreDir) + } + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + testList := func(t *testing.T, srv *Server, r int) { + nc, js := jsClientConnect(t, srv) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "ONE", + Subjects: []string{"one.>"}, + Replicas: r, + }) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TWO", + Subjects: []string{"two.>"}, + Replicas: r, + }) + require_NoError(t, err) + + resp, err := nc.Request(JSApiStreamList, []byte("{}"), time.Second) + require_NoError(t, err) + + list := &JSApiStreamListResponse{} + err = json.Unmarshal(resp.Data, list) + require_NoError(t, err) + + if len(list.Streams) != 2 { + t.Fatalf("Expected 2 responses got %d", len(list.Streams)) + } + + resp, err = nc.Request(JSApiStreamList, []byte(`{"subject":"two.x"}`), time.Second) + require_NoError(t, err) + list = &JSApiStreamListResponse{} + err = json.Unmarshal(resp.Data, list) + require_NoError(t, err) + if len(list.Streams) != 1 { + t.Fatalf("Expected 1 response got %d", len(list.Streams)) + } + if list.Streams[0].Config.Name != "TWO" { + t.Fatalf("Expected stream TWO in result got %#v", list.Streams[0]) + } + } + + t.Run("Single", func(t *testing.T) { testList(t, s, 1) }) + t.Run("Clustered", func(t *testing.T) { testList(t, c.randomServer(), 3) }) +} + func TestJetStreamConsumerUpdates(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown()