Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event stream: fix wildcard namespace bypass #25089

Merged
merged 3 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/25089.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:security
event stream: fixes an issue in the event stream API where using a wildcard namespace can allow a user with "read" capabilities on any namespace, the equivalent of "read" on all namespaces
mismithhisler marked this conversation as resolved.
Show resolved Hide resolved
```
49 changes: 42 additions & 7 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
}

authErr := e.srv.Authenticate(nil, &args)
if authErr != nil {
handleJsonResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder)
return
}

// forward to appropriate region
if args.Region != e.srv.config.Region {
Expand All @@ -52,16 +56,27 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
}

e.srv.MeasureRPCRate("event", structs.RateMetricRead, &args)
if authErr != nil {

resolvedACL, err := e.srv.ResolveACL(&args)
if err != nil {
handleJsonResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder)
return
}

validatedNses, err := e.validateACL(args.Namespace, args.Topics, resolvedACL)
if err != nil {
handleJsonResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder)
return
}

// Generate the subscription request
subReq := &stream.SubscribeRequest{
Token: args.AuthToken,
Topics: args.Topics,
Index: uint64(args.Index),
Namespace: args.Namespace,
Token: args.AuthToken,
Topics: args.Topics,
Index: uint64(args.Index),
// Namespaces is set once, in the event a users ACL is updated to include
// more NSes, the current event stream will not include the new NSes.
Namespaces: validatedNses,
Authenticate: func() error {
if err := e.srv.Authenticate(nil, &args); err != nil {
return err
Expand All @@ -70,7 +85,8 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
if err != nil {
return err
}
return validateACL(args.Namespace, args.Topics, resolvedACL)
_, err = e.validateACL(args.Namespace, args.Topics, resolvedACL)
return err
},
}

Expand Down Expand Up @@ -225,7 +241,26 @@ func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
})
}

func validateACL(namespace string, topics map[structs.Topic][]string, aclObj *acl.ACL) error {
// validateACL handles wildcard namespaces by replacing it with all existing namespaces
// and validates the user has the appropriate ACL to read topics in each one.
func (e *Event) validateACL(namespace string, topics map[structs.Topic][]string, resolvedAcl *acl.ACL) ([]string, error) {
nses := []string{}
if namespace == structs.AllNamespacesSentinel {
ns, _ := e.srv.State().NamespaceNames()
nses = append(nses, ns...)
} else {
nses = append(nses, namespace)
}

for _, ns := range nses {
if err := validateNsOp(ns, topics, resolvedAcl); err != nil {
return nil, err
}
}
return nses, nil
}

func validateNsOp(namespace string, topics map[structs.Topic][]string, aclObj *acl.ACL) error {
for topic := range topics {
switch topic {
case structs.TopicDeployment,
Expand Down
66 changes: 64 additions & 2 deletions nomad/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ OUTER:
}
}

func TestEventStream_validateACL(t *testing.T) {
func TestEventStream_validateNsOp(t *testing.T) {
ci.Parallel(t)
require := require.New(t)

Expand Down Expand Up @@ -480,12 +480,74 @@ func TestEventStream_validateACL(t *testing.T) {
testACL, err := acl.NewACL(tc.Management, []*acl.Policy{p})
require.NoError(err)

err = validateACL(tc.Namespace, tc.Topics, testACL)
err = validateNsOp(tc.Namespace, tc.Topics, testACL)
require.Equal(tc.ExpectedErr, err)
})
}
}

func TestEventStream_validateACL(t *testing.T) {
ci.Parallel(t)

s1, _, cleanupS := TestACLServer(t, nil)
defer cleanupS()
testutil.WaitForLeader(t, s1.RPC)

ns1 := mock.Namespace()

err := s1.State().UpsertNamespaces(0, []*structs.Namespace{ns1})
must.NoError(t, err)

testEvent := &Event{srv: s1}

t.Run("single namespace ACL errors on wildcard", func(t *testing.T) {
policy, err := acl.Parse(mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob}))
must.NoError(t, err)

// does not contain policy for default NS
testAcl, err := acl.NewACL(false, []*acl.Policy{policy})
must.NoError(t, err)

topics := map[structs.Topic][]string{
structs.TopicJob: {"*"},
}
_, err = testEvent.validateACL("*", topics, testAcl)
must.Error(t, err)
})

t.Run("all namespace ACL succeeds on wildcard", func(t *testing.T) {
policy1, err := acl.Parse(mock.NamespacePolicy("default", "", []string{acl.NamespaceCapabilityReadJob}))
must.NoError(t, err)
policy2, err := acl.Parse(mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob}))
must.NoError(t, err)

testAcl, err := acl.NewACL(false, []*acl.Policy{policy1, policy2})
must.NoError(t, err)

topics := map[structs.Topic][]string{
structs.TopicJob: {"*"},
}
nses, err := testEvent.validateACL("*", topics, testAcl)
must.NoError(t, err)
must.Eq(t, nses, []string{"default", ns1.Name})
})

t.Run("single namespace ACL succeeds with correct NS", func(t *testing.T) {
policy, err := acl.Parse(mock.NamespacePolicy("default", "", []string{acl.NamespaceCapabilityReadJob}))
must.NoError(t, err)

testAcl, err := acl.NewACL(false, []*acl.Policy{policy})
must.NoError(t, err)

topics := map[structs.Topic][]string{
structs.TopicJob: {"*"},
}
nses, err := testEvent.validateACL("default", topics, testAcl)
must.NoError(t, err)
must.Eq(t, nses, []string{"default"})
})
}

// TestEventStream_ACL_Update_Close_Stream asserts that an active subscription
// is closed after the token is no longer valid
func TestEventStream_ACL_Update_Close_Stream(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3676,7 +3676,7 @@ func TestFSM_ACLEvents(t *testing.T) {
Topics: map[structs.Topic][]string{
tc.reqTopic: {"*"},
},
Namespace: "default",
Namespaces: []string{"default"},
}

sub, err := broker.Subscribe(subReq)
Expand Down Expand Up @@ -3730,7 +3730,7 @@ func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) {
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
},
Namespace: "default",
Namespaces: []string{"default"},
}

sub, err := broker.Subscribe(subReq)
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/deployment_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event {
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Namespace: "default",
Namespaces: []string{"default"},
Index: index,
StartExactlyAtIndex: true,
})
Expand Down
14 changes: 5 additions & 9 deletions nomad/stream/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package stream
import (
"context"
"errors"
"slices"
"sync/atomic"

"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -48,9 +49,9 @@ type Subscription struct {
}

type SubscribeRequest struct {
Token string
Index uint64
Namespace string
Token string
Index uint64
Namespaces []string

Topics map[structs.Topic][]string

Expand Down Expand Up @@ -130,15 +131,10 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {

allTopicKeys := req.Topics[structs.TopicAll]

// Return all events if subscribed to all namespaces and all topics
if req.Namespace == "*" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
return events
}

var result []structs.Event

for _, event := range events {
if req.Namespace != "*" && event.Namespace != "" && event.Namespace != req.Namespace {
if event.Namespace != "" && !slices.Contains(req.Namespaces, event.Namespace) {
continue
}

Expand Down
Loading