Skip to content

Commit

Permalink
chore(kafka): tenant prefix codec (#16020)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Jan 30, 2025
1 parent 998da11 commit ed3a807
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 11 deletions.
48 changes: 37 additions & 11 deletions pkg/distributor/tenant_topic_tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"flag"
"fmt"
"math"
"strconv"
"strings"
"sync"

"github.com/go-kit/log"
Expand Down Expand Up @@ -52,6 +54,35 @@ func ParseStrategy(s string) (Strategy, error) {
}
}

// codec for `<prefix>.<tenant>.<shard>` topics
type TenantPrefixCodec string

func (c TenantPrefixCodec) Encode(tenant string, shard int32) string {
return fmt.Sprintf("%s.%s.%d", c, tenant, shard)
}

func (c TenantPrefixCodec) Decode(s string) (tenant string, shard int32, err error) {
suffix, ok := strings.CutPrefix(s, string(c)+".")
if !ok {
return "", 0, fmt.Errorf("invalid format: %s", s)
}

rem := strings.Split(suffix, ".")

if len(rem) < 2 {
return "", 0, fmt.Errorf("invalid format: %s", s)
}

n, err := strconv.Atoi(rem[len(rem)-1])
if err != nil {
return "", 0, fmt.Errorf("invalid format: %s", s)
}

tenant = strings.Join(rem[:len(rem)-1], ".")

return tenant, int32(n), nil
}

// TenantTopicConfig configures the TenantTopicWriter
type TenantTopicConfig struct {
Enabled bool `yaml:"enabled"`
Expand Down Expand Up @@ -154,8 +185,8 @@ func (r *SimplePartitionResolver) Resolve(_ context.Context, tenant string, tota
// 4. Returns the topic name with partition 0 (since each topic has exactly one partition)
type ShardedPartitionResolver struct {
sync.RWMutex
admin *kadm.Client
topicPrefix string
admin *kadm.Client
codec TenantPrefixCodec

sflight singleflight.Group // for topic creation
// tenantShards maps tenant IDs to their active shards
Expand All @@ -167,7 +198,7 @@ type ShardedPartitionResolver struct {
func NewShardedPartitionResolver(kafkaClient *kgo.Client, topicPrefix string) *ShardedPartitionResolver {
return &ShardedPartitionResolver{
admin: kadm.NewClient(kafkaClient),
topicPrefix: topicPrefix,
codec: TenantPrefixCodec(topicPrefix),
tenantShards: make(map[string]map[int32]struct{}),
}
}
Expand All @@ -187,7 +218,7 @@ func (r *ShardedPartitionResolver) Resolve(ctx context.Context, tenant string, t
_, shardExists := shards[shard]
if shardExists {
r.RUnlock()
return r.topicName(tenant, shard), 0, nil
return r.codec.Encode(tenant, shard), 0, nil
}
}
r.RUnlock()
Expand All @@ -197,13 +228,13 @@ func (r *ShardedPartitionResolver) Resolve(ctx context.Context, tenant string, t
return "", 0, fmt.Errorf("failed to create shard: %w", err)
}

return r.topicName(tenant, shard), 0, nil
return r.codec.Encode(tenant, shard), 0, nil
}

// createShard creates a new topic for the given tenant and shard.
// It handles the case where the topic already exists.
func (r *ShardedPartitionResolver) createShard(ctx context.Context, tenant string, shard int32) error {
topic := r.topicName(tenant, shard)
topic := r.codec.Encode(tenant, shard)
replicationFactor := 2 // TODO: expose RF

_, err, _ := r.sflight.Do(topic, func() (interface{}, error) {
Expand Down Expand Up @@ -235,11 +266,6 @@ func (r *ShardedPartitionResolver) createShard(ctx context.Context, tenant strin
return nil
}

// topicName returns the topic name for a given tenant and shard
func (r *ShardedPartitionResolver) topicName(tenant string, shard int32) string {
return fmt.Sprintf("%s.%s.%d", r.topicPrefix, tenant, shard)
}

// TenantTopicWriter implements the Tee interface by writing logs to Kafka topics
// based on tenant IDs. Each tenant's logs are written to a dedicated topic.
type TenantTopicWriter struct {
Expand Down
92 changes: 92 additions & 0 deletions pkg/distributor/tenant_topic_tee_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package distributor

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestTenantPrefixCodec(t *testing.T) {
tests := []struct {
name string
codec TenantPrefixCodec
tenant string
shard int32
encoded string
expectedErr string
decodeOnly bool // for testing invalid decode cases
}{
{
name: "simple tenant and prefix",
codec: "prefix",
tenant: "tenant1",
shard: 1,
encoded: "prefix.tenant1.1",
},
{
name: "tenant with dots",
codec: "prefix",
tenant: "tenant.with.dots",
shard: 2,
encoded: "prefix.tenant.with.dots.2",
},
{
name: "prefix with dots",
codec: "prefix.with.dots",
tenant: "tenant1",
shard: 3,
encoded: "prefix.with.dots.tenant1.3",
},
{
name: "both tenant and prefix with dots",
codec: "prefix.with.dots",
tenant: "tenant.with.dots",
shard: 4,
encoded: "prefix.with.dots.tenant.with.dots.4",
},
{
name: "invalid format - missing prefix",
codec: "prefix",
encoded: "wrongprefix.tenant1.1",
decodeOnly: true,
expectedErr: "invalid format: wrongprefix.tenant1.1",
},
{
name: "invalid format - missing shard",
codec: "prefix",
encoded: "prefix.tenant1",
decodeOnly: true,
expectedErr: "invalid format: prefix.tenant1",
},
{
name: "invalid format - non-numeric shard",
codec: "prefix",
encoded: "prefix.tenant1.abc",
decodeOnly: true,
expectedErr: "invalid format: prefix.tenant1.abc",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if !tt.decodeOnly {
// Test encode
encoded := tt.codec.Encode(tt.tenant, tt.shard)
require.Equal(t, tt.encoded, encoded)

// Test encode/decode roundtrip
decodedTenant, decodedShard, err := tt.codec.Decode(encoded)
require.NoError(t, err)
require.Equal(t, tt.tenant, decodedTenant)
require.Equal(t, tt.shard, decodedShard)
} else {
// Test decode error cases
tenant, shard, err := tt.codec.Decode(tt.encoded)
require.Error(t, err)
require.Equal(t, tt.expectedErr, err.Error())
require.Equal(t, "", tenant)
require.Equal(t, int32(0), shard)
}
})
}
}

0 comments on commit ed3a807

Please sign in to comment.