Skip to content

Commit 1eea499

Browse files
events: fix wildcard namespace handling (#10935)
This fixes a bug in the event stream API where it currently interprets namespace=* as an actual namespace, not a wildcard. When Nomad parses incoming requests, it sets namespace to default if not specified, which means the request namespace will never be an empty string, which is what the event subscription was checking for. This changes the conditional logic to check for a wildcard namespace instead of an empty one. It also updates some event tests to include the default namespace in the subscription to match current behavior. Fixes #10903
1 parent 9ba1a2f commit 1eea499

File tree

7 files changed

+69
-5
lines changed

7 files changed

+69
-5
lines changed

.changelog/10935.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
events: Fixed wildcard namespace handling
3+
```

nomad/fsm_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -3378,6 +3378,7 @@ func TestFSM_ACLEvents(t *testing.T) {
33783378
Topics: map[structs.Topic][]string{
33793379
tc.reqTopic: {"*"},
33803380
},
3381+
Namespace: "default",
33813382
}
33823383

33833384
sub, err := broker.Subscribe(subReq)
@@ -3431,6 +3432,7 @@ func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) {
34313432
Topics: map[structs.Topic][]string{
34323433
structs.TopicJob: {"*"},
34333434
},
3435+
Namespace: "default",
34343436
}
34353437

34363438
sub, err := broker.Subscribe(subReq)

nomad/state/deployment_events_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event {
9393
Topics: map[structs.Topic][]string{
9494
"*": {"*"},
9595
},
96+
Namespace: "default",
9697
Index: index,
9798
StartExactlyAtIndex: true,
9899
})

nomad/stream/event_broker_test.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,27 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
270270
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
271271
},
272272
},
273+
{
274+
desc: "subscribed to evals in all namespaces and removed access",
275+
policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}),
276+
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
277+
shouldUnsubscribe: true,
278+
event: structs.Event{
279+
Topic: structs.TopicEvaluation,
280+
Type: structs.TypeEvalUpdated,
281+
Namespace: "foo",
282+
Payload: structs.EvaluationEvent{
283+
Evaluation: &structs.Evaluation{
284+
ID: "some-id",
285+
},
286+
},
287+
},
288+
policyEvent: structs.Event{
289+
Topic: structs.TopicACLToken,
290+
Type: structs.TypeACLTokenUpserted,
291+
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
292+
},
293+
},
273294
{
274295
desc: "subscribed to deployments and no access change",
275296
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
@@ -467,11 +488,18 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
467488
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
468489
require.NoError(t, err)
469490

491+
var ns string
492+
if tc.event.Namespace != "" {
493+
ns = tc.event.Namespace
494+
} else {
495+
ns = structs.DefaultNamespace
496+
}
497+
470498
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
471499
Topics: map[structs.Topic][]string{
472500
tc.event.Topic: {"*"},
473501
},
474-
Namespace: structs.DefaultNamespace,
502+
Namespace: ns,
475503
Token: secretID,
476504
})
477505

nomad/stream/subscription.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,15 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {
123123

124124
allTopicKeys := req.Topics[structs.TopicAll]
125125

126-
if req.Namespace == "" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
126+
// Return all events if subscribed to all namespaces and all topics
127+
if req.Namespace == "*" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
127128
return events
128129
}
129130

130131
var result []structs.Event
131132

132133
for _, event := range events {
133-
if req.Namespace != "" && event.Namespace != "" && event.Namespace != req.Namespace {
134+
if req.Namespace != "*" && event.Namespace != "" && event.Namespace != req.Namespace {
134135
continue
135136
}
136137

nomad/stream/subscription_test.go

+23
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,29 @@ func TestFilter_Namespace(t *testing.T) {
147147
require.Equal(t, 2, cap(actual))
148148
}
149149

150+
func TestFilter_NamespaceAll(t *testing.T) {
151+
events := make([]structs.Event, 0, 5)
152+
events = append(events,
153+
structs.Event{Topic: "Test", Key: "One", Namespace: "foo"},
154+
structs.Event{Topic: "Test", Key: "Two", Namespace: "bar"},
155+
structs.Event{Topic: "Test", Key: "Three", Namespace: "default"},
156+
)
157+
158+
req := &SubscribeRequest{
159+
Topics: map[structs.Topic][]string{
160+
"*": {"*"},
161+
},
162+
Namespace: "*",
163+
}
164+
actual := filter(req, events)
165+
expected := []structs.Event{
166+
{Topic: "Test", Key: "One", Namespace: "foo"},
167+
{Topic: "Test", Key: "Two", Namespace: "bar"},
168+
{Topic: "Test", Key: "Three", Namespace: "default"},
169+
}
170+
require.Equal(t, expected, actual)
171+
}
172+
150173
func TestFilter_FilterKeys(t *testing.T) {
151174
events := make([]structs.Event, 0, 5)
152175
events = append(events, structs.Event{Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, structs.Event{Topic: "Test", Key: "Two"}, structs.Event{Topic: "Test", Key: "Two"})

website/content/api-docs/events.mdx

+8-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ by default, requiring a management token.
4747

4848
- `namespace` `(string: "default")` - Specifies the target namespace to filter
4949
on. Specifying `*` includes all namespaces for event types that support
50-
namespaces.
50+
namespaces. If you specify all namespaces (`*`) you'll either need a management
51+
token, or an ACL Policy that explicitly applies to all namespaces (`*`).
5152

5253
- `topic` `(topic:filter_key: "*:*")` - Specifies a topic to subscribe to and
5354
filter on. The default is to subscribe to all topics. Multiple topics may be
@@ -100,10 +101,15 @@ by default, requiring a management token.
100101
### Sample Request
101102

102103
```shell-session
103-
# Subscribe to all events and topics
104+
# Subscribe to all events and topics in the default namespace
104105
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream
105106
```
106107

108+
```shell-session
109+
# Subscribe to all events and topics in all namespaces
110+
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?namespace=*
111+
```
112+
107113
```shell-session
108114
# Start at index 100 and subscribe to all Evaluation events
109115
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?index=100&topic=Evaluation

0 commit comments

Comments
 (0)