diff --git a/pkg/distributor/tenant_topic_tee.go b/pkg/distributor/tenant_topic_tee.go index 7d7ad84e7bc53..60ea6bc2bdd8d 100644 --- a/pkg/distributor/tenant_topic_tee.go +++ b/pkg/distributor/tenant_topic_tee.go @@ -6,6 +6,8 @@ import ( "flag" "fmt" "math" + "strconv" + "strings" "sync" "github.com/go-kit/log" @@ -52,6 +54,35 @@ func ParseStrategy(s string) (Strategy, error) { } } +// codec for `..` topics +type TenantPrefixCodec string + +func (c TenantPrefixCodec) Encode(tenant string, shard int32) string { + return fmt.Sprintf("%s.%s.%d", c, tenant, shard) +} + +func (c TenantPrefixCodec) Decode(s string) (tenant string, shard int32, err error) { + suffix, ok := strings.CutPrefix(s, string(c)+".") + if !ok { + return "", 0, fmt.Errorf("invalid format: %s", s) + } + + rem := strings.Split(suffix, ".") + + if len(rem) < 2 { + return "", 0, fmt.Errorf("invalid format: %s", s) + } + + n, err := strconv.Atoi(rem[len(rem)-1]) + if err != nil { + return "", 0, fmt.Errorf("invalid format: %s", s) + } + + tenant = strings.Join(rem[:len(rem)-1], ".") + + return tenant, int32(n), nil +} + // TenantTopicConfig configures the TenantTopicWriter type TenantTopicConfig struct { Enabled bool `yaml:"enabled"` @@ -154,8 +185,8 @@ func (r *SimplePartitionResolver) Resolve(_ context.Context, tenant string, tota // 4. Returns the topic name with partition 0 (since each topic has exactly one partition) type ShardedPartitionResolver struct { sync.RWMutex - admin *kadm.Client - topicPrefix string + admin *kadm.Client + codec TenantPrefixCodec sflight singleflight.Group // for topic creation // tenantShards maps tenant IDs to their active shards @@ -167,7 +198,7 @@ type ShardedPartitionResolver struct { func NewShardedPartitionResolver(kafkaClient *kgo.Client, topicPrefix string) *ShardedPartitionResolver { return &ShardedPartitionResolver{ admin: kadm.NewClient(kafkaClient), - topicPrefix: topicPrefix, + codec: TenantPrefixCodec(topicPrefix), tenantShards: make(map[string]map[int32]struct{}), } } @@ -187,7 +218,7 @@ func (r *ShardedPartitionResolver) Resolve(ctx context.Context, tenant string, t _, shardExists := shards[shard] if shardExists { r.RUnlock() - return r.topicName(tenant, shard), 0, nil + return r.codec.Encode(tenant, shard), 0, nil } } r.RUnlock() @@ -197,13 +228,13 @@ func (r *ShardedPartitionResolver) Resolve(ctx context.Context, tenant string, t return "", 0, fmt.Errorf("failed to create shard: %w", err) } - return r.topicName(tenant, shard), 0, nil + return r.codec.Encode(tenant, shard), 0, nil } // createShard creates a new topic for the given tenant and shard. // It handles the case where the topic already exists. func (r *ShardedPartitionResolver) createShard(ctx context.Context, tenant string, shard int32) error { - topic := r.topicName(tenant, shard) + topic := r.codec.Encode(tenant, shard) replicationFactor := 2 // TODO: expose RF _, err, _ := r.sflight.Do(topic, func() (interface{}, error) { @@ -235,11 +266,6 @@ func (r *ShardedPartitionResolver) createShard(ctx context.Context, tenant strin return nil } -// topicName returns the topic name for a given tenant and shard -func (r *ShardedPartitionResolver) topicName(tenant string, shard int32) string { - return fmt.Sprintf("%s.%s.%d", r.topicPrefix, tenant, shard) -} - // TenantTopicWriter implements the Tee interface by writing logs to Kafka topics // based on tenant IDs. Each tenant's logs are written to a dedicated topic. type TenantTopicWriter struct { diff --git a/pkg/distributor/tenant_topic_tee_test.go b/pkg/distributor/tenant_topic_tee_test.go new file mode 100644 index 0000000000000..d82179e7f0156 --- /dev/null +++ b/pkg/distributor/tenant_topic_tee_test.go @@ -0,0 +1,92 @@ +package distributor + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTenantPrefixCodec(t *testing.T) { + tests := []struct { + name string + codec TenantPrefixCodec + tenant string + shard int32 + encoded string + expectedErr string + decodeOnly bool // for testing invalid decode cases + }{ + { + name: "simple tenant and prefix", + codec: "prefix", + tenant: "tenant1", + shard: 1, + encoded: "prefix.tenant1.1", + }, + { + name: "tenant with dots", + codec: "prefix", + tenant: "tenant.with.dots", + shard: 2, + encoded: "prefix.tenant.with.dots.2", + }, + { + name: "prefix with dots", + codec: "prefix.with.dots", + tenant: "tenant1", + shard: 3, + encoded: "prefix.with.dots.tenant1.3", + }, + { + name: "both tenant and prefix with dots", + codec: "prefix.with.dots", + tenant: "tenant.with.dots", + shard: 4, + encoded: "prefix.with.dots.tenant.with.dots.4", + }, + { + name: "invalid format - missing prefix", + codec: "prefix", + encoded: "wrongprefix.tenant1.1", + decodeOnly: true, + expectedErr: "invalid format: wrongprefix.tenant1.1", + }, + { + name: "invalid format - missing shard", + codec: "prefix", + encoded: "prefix.tenant1", + decodeOnly: true, + expectedErr: "invalid format: prefix.tenant1", + }, + { + name: "invalid format - non-numeric shard", + codec: "prefix", + encoded: "prefix.tenant1.abc", + decodeOnly: true, + expectedErr: "invalid format: prefix.tenant1.abc", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if !tt.decodeOnly { + // Test encode + encoded := tt.codec.Encode(tt.tenant, tt.shard) + require.Equal(t, tt.encoded, encoded) + + // Test encode/decode roundtrip + decodedTenant, decodedShard, err := tt.codec.Decode(encoded) + require.NoError(t, err) + require.Equal(t, tt.tenant, decodedTenant) + require.Equal(t, tt.shard, decodedShard) + } else { + // Test decode error cases + tenant, shard, err := tt.codec.Decode(tt.encoded) + require.Error(t, err) + require.Equal(t, tt.expectedErr, err.Error()) + require.Equal(t, "", tenant) + require.Equal(t, int32(0), shard) + } + }) + } +}