Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow streams api to be filtered like list api #2695

Merged
merged 1 commit into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ type JSApiStreamNamesResponse struct {

const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"

type JSApiStreamListRequest struct {
ApiPagedRequest
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
}

// JSApiStreamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type JSApiStreamListResponse struct {
Expand Down Expand Up @@ -1541,27 +1547,38 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
}

var offset int
var filter string

if !isEmptyRequest(msg) {
var req JSApiStreamNamesRequest
var req JSApiStreamListRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
offset = req.Offset
if req.Subject != _EMPTY_ {
filter = req.Subject
}
}

// Clustered mode will invoke a scatter and gather.
if s.JetStreamIsClustered() {
// Need to copy these off before sending..
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg) })
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
return
}

// TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
// TODO(dlc) - If this list is long maybe do this in a Go routine?
msets := acc.streams()
var msets []*stream
if filter == _EMPTY_ {
msets = acc.streams()
} else {
msets = acc.filteredStreams(filter)
}

sort.Slice(msets, func(i, j int) bool {
return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0
})
Expand Down
25 changes: 23 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3836,7 +3836,7 @@ func (s *Server) allPeersOffline(rg *raftGroup) bool {

// This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader.
// This will be running in a separate Go routine.
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) {
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filter string, offset int, subject, reply string, rmsg []byte) {
defer s.grWG.Done()

js, cc := s.getJetStreamCluster()
Expand All @@ -3848,8 +3848,29 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs

var streams []*streamAssignment
for _, sa := range cc.streams[acc.Name] {
streams = append(streams, sa)
if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
continue
}

if filter != _EMPTY_ {
// These could not have subjects auto-filled in since they are raw and unprocessed.
if len(sa.Config.Subjects) == 0 {
if SubjectsCollide(filter, sa.Config.Name) {
streams = append(streams, sa)
}
} else {
for _, subj := range sa.Config.Subjects {
if SubjectsCollide(filter, subj) {
streams = append(streams, sa)
break
}
}
}
} else {
streams = append(streams, sa)
}
}

// Needs to be sorted for offsets etc.
if len(streams) > 1 {
sort.Slice(streams, func(i, j int) bool {
Expand Down
57 changes: 57 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9442,6 +9442,63 @@ func TestJetStreamClusterAccountInfoForSystemAccount(t *testing.T) {
}
}

func TestJetStreamListFilter(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

testList := func(t *testing.T, srv *Server, r int) {
nc, js := jsClientConnect(t, srv)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "ONE",
Subjects: []string{"one.>"},
Replicas: r,
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TWO",
Subjects: []string{"two.>"},
Replicas: r,
})
require_NoError(t, err)

resp, err := nc.Request(JSApiStreamList, []byte("{}"), time.Second)
require_NoError(t, err)

list := &JSApiStreamListResponse{}
err = json.Unmarshal(resp.Data, list)
require_NoError(t, err)

if len(list.Streams) != 2 {
t.Fatalf("Expected 2 responses got %d", len(list.Streams))
}

resp, err = nc.Request(JSApiStreamList, []byte(`{"subject":"two.x"}`), time.Second)
require_NoError(t, err)
list = &JSApiStreamListResponse{}
err = json.Unmarshal(resp.Data, list)
require_NoError(t, err)
if len(list.Streams) != 1 {
t.Fatalf("Expected 1 response got %d", len(list.Streams))
}
if list.Streams[0].Config.Name != "TWO" {
t.Fatalf("Expected stream TWO in result got %#v", list.Streams[0])
}
}

t.Run("Single", func(t *testing.T) { testList(t, s, 1) })
t.Run("Clustered", func(t *testing.T) { testList(t, c.randomServer(), 3) })
}

func TestJetStreamConsumerUpdates(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down