Skip to content

Commit

Permalink
kafka: Add support for creating ACLs
Browse files Browse the repository at this point in the history
Allows creating ACLs using the Kafka Manager API.

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop committed Feb 25, 2025
1 parent 07cf390 commit 92de2e3
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
15 changes: 15 additions & 0 deletions kafka/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,18 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m
assignmentMetric,
)
}

// CreateACLs creates the specified ACLs in the Kafka cluster.
func (m *Manager) CreateACLs(ctx context.Context, acls *kadm.ACLBuilder) error {
res, err := m.adminClient.CreateACLs(ctx, acls)
if err != nil {
return fmt.Errorf("failed to create ACLs: %w", err)
}
var errs []error
for _, r := range res {
if r.Err != nil {
errs = append(errs, r.Err)
}
}
return errors.Join(errs...)
}
75 changes: 75 additions & 0 deletions kafka/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -628,6 +629,80 @@ func TestManagerMetrics(t *testing.T) {
assert.Equal(t, "GatherMetrics", spans[0].Name)
}

func TestManagerCreateACLs(t *testing.T) {
t.Run("Success", func(t *testing.T) {
cluster, commonConfig := newFakeCluster(t)
// Test successful ACL creation
cluster.ControlKey(kmsg.CreateACLs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.CreateACLsResponse{
Version: req.GetVersion(),
Results: []kmsg.CreateACLsResponseResult{
{}, {}, // Empty result means success
},
}, nil, true
})
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
require.NoError(t, err)
t.Cleanup(func() { m.Close() })

acls := kadm.NewACLs().
Allow("User:*").
Topics("topic").
Operations(kadm.OpRead, kadm.OpWrite). // More specific operations instead of OpAll
ResourcePatternType(kadm.ACLPatternPrefixed)

// `kfake` does not support the ApiVersions API, so we need to
// manually add the CreateACLs API to the ApiVersions response.
cluster.ControlKey(kmsg.ApiVersions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.ApiVersionsResponse{
Version: req.GetVersion(),
ApiKeys: []kmsg.ApiVersionsResponseApiKey{
{ApiKey: kmsg.CreateACLs.Int16(), MaxVersion: 3},
},
}, nil, true
})

err = m.CreateACLs(context.Background(), acls)
assert.NoError(t, err)
})
t.Run("Partial Failure", func(t *testing.T) {
cluster, commonConfig := newFakeCluster(t)
respErr := kerr.InvalidPrincipalType
// Test unsuccessful ACL creation
cluster.ControlKey(kmsg.CreateACLs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.CreateACLsResponse{
Version: req.GetVersion(),
Results: []kmsg.CreateACLsResponseResult{
{ErrorCode: respErr.Code, ErrorMessage: &respErr.Message},
},
}, nil, true
})
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
require.NoError(t, err)
t.Cleanup(func() { m.Close() })

acls := kadm.NewACLs().
Allow("User:*").
Topics("topic").
Operations(kadm.OpWrite). // More specific operations instead of OpAll
ResourcePatternType(kadm.ACLPatternPrefixed)

// `kfake` does not support the ApiVersions API, so we need to
// manually add the CreateACLs API to the ApiVersions response.
cluster.ControlKey(kmsg.ApiVersions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.ApiVersionsResponse{
Version: req.GetVersion(),
ApiKeys: []kmsg.ApiVersionsResponseApiKey{
{ApiKey: kmsg.CreateACLs.Int16(), MaxVersion: 3},
},
}, nil, true
})

err = m.CreateACLs(context.Background(), acls)
assert.EqualError(t, err, respErr.Error())
})
}

func newFakeCluster(t testing.TB) (*kfake.Cluster, CommonConfig) {
cluster, err := kfake.NewCluster(
// Just one broker to simplify dealing with sharded requests.
Expand Down

0 comments on commit 92de2e3

Please sign in to comment.