Skip to content

Commit

Permalink
allow streams api to be filtered like list api
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Nov 17, 2021
1 parent 70cd512 commit c14f889
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 5 deletions.
23 changes: 20 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ type JSApiStreamNamesResponse struct {

const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"

type JSApiStreamListRequest struct {
ApiPagedRequest
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
}

// JSApiStreamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type JSApiStreamListResponse struct {
Expand Down Expand Up @@ -1541,27 +1547,38 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
}

var offset int
var filter string

if !isEmptyRequest(msg) {
var req JSApiStreamNamesRequest
var req JSApiStreamListRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
offset = req.Offset
if req.Subject != _EMPTY_ {
filter = req.Subject
}
}

// Clustered mode will invoke a scatter and gather.
if s.JetStreamIsClustered() {
// Need to copy these off before sending..
msg = append(msg[:0:0], msg...)
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, offset, subject, reply, msg) })
s.startGoRoutine(func() { s.jsClusteredStreamListRequest(acc, ci, filter, offset, subject, reply, msg) })
return
}

// TODO(dlc) - Maybe hold these results for large results that we expect to be paged.
// TODO(dlc) - If this list is long maybe do this in a Go routine?
msets := acc.streams()
var msets []*stream
if filter == _EMPTY_ {
msets = acc.streams()
} else {
msets = acc.filteredStreams(filter)
}

sort.Slice(msets, func(i, j int) bool {
return strings.Compare(msets[i].cfg.Name, msets[j].cfg.Name) < 0
})
Expand Down
25 changes: 23 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3836,7 +3836,7 @@ func (s *Server) allPeersOffline(rg *raftGroup) bool {

// This will do a scatter and gather operation for all streams for this account. This is only called from metadata leader.
// This will be running in a separate Go routine.
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offset int, subject, reply string, rmsg []byte) {
func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filter string, offset int, subject, reply string, rmsg []byte) {
defer s.grWG.Done()

js, cc := s.getJetStreamCluster()
Expand All @@ -3848,8 +3848,29 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, offs

var streams []*streamAssignment
for _, sa := range cc.streams[acc.Name] {
streams = append(streams, sa)
if IsNatsErr(sa.err, JSClusterNotAssignedErr) {
continue
}

if filter != _EMPTY_ {
// These could not have subjects auto-filled in since they are raw and unprocessed.
if len(sa.Config.Subjects) == 0 {
if SubjectsCollide(filter, sa.Config.Name) {
streams = append(streams, sa)
}
} else {
for _, subj := range sa.Config.Subjects {
if SubjectsCollide(filter, subj) {
streams = append(streams, sa)
break
}
}
}
} else {
streams = append(streams, sa)
}
}

// Needs to be sorted for offsets etc.
if len(streams) > 1 {
sort.Slice(streams, func(i, j int) bool {
Expand Down
46 changes: 46 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9442,6 +9442,52 @@ func TestJetStreamClusterAccountInfoForSystemAccount(t *testing.T) {
}
}

func TestJetClusterStreamListFilter(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "ONE",
Subjects: []string{"one.>"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TWO",
Subjects: []string{"two.>"},
Replicas: 3,
})
require_NoError(t, err)

resp, err := nc.Request(JSApiStreamList, []byte("{}"), time.Second)
require_NoError(t, err)

list := &JSApiStreamListResponse{}
err = json.Unmarshal(resp.Data, list)
require_NoError(t, err)

if len(list.Streams) != 2 {
t.Fatalf("Expected 2 responses got %d", len(list.Streams))
}

resp, err = nc.Request(JSApiStreamList, []byte(`{"subject":"two.x"}`), time.Second)
require_NoError(t, err)
list = &JSApiStreamListResponse{}
err = json.Unmarshal(resp.Data, list)
require_NoError(t, err)
if len(list.Streams) != 1 {
t.Fatalf("Expected 1 response got %d", len(list.Streams))
}
if list.Streams[0].Config.Name != "TWO" {
t.Fatalf("Expected stream TWO in result got %#v", list.Streams[0])
}
}

func TestJetStreamConsumerUpdates(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down
43 changes: 43 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13493,6 +13493,49 @@ func TestJetStreamPurgeAndFilteredConsumers(t *testing.T) {
}
}

func TestJetStreamListFilter(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "ONE",
Subjects: []string{"one.>"},
})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TWO",
Subjects: []string{"two.>"},
})
require_NoError(t, err)

resp, err := nc.Request(JSApiStreamList, []byte("{}"), time.Second)
require_NoError(t, err)

list := &JSApiStreamListResponse{}
err = json.Unmarshal(resp.Data, list)
require_NoError(t, err)

if len(list.Streams) != 2 {
t.Fatalf("Expected 2 responses got %d", len(list.Streams))
}

resp, err = nc.Request(JSApiStreamList, []byte(`{"subject":"two.x"}`), time.Second)
require_NoError(t, err)
list = &JSApiStreamListResponse{}
err = json.Unmarshal(resp.Data, list)
require_NoError(t, err)
if len(list.Streams) != 1 {
t.Fatalf("Expected 1 response got %d", len(list.Streams))
}
if list.Streams[0].Config.Name != "TWO" {
t.Fatalf("Expected stream TWO in result got %#v", list.Streams[0])
}
}

// Issue #2662
func TestJetStreamLargeExpiresAndServerRestart(t *testing.T) {
s := RunBasicJetStreamServer()
Expand Down

0 comments on commit c14f889

Please sign in to comment.