Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RawProduce API #1233

Merged
merged 32 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0b71c86
add raw produce api
apacker Oct 18, 2023
3d5054f
rename raw request/response types
apacker Oct 19, 2023
24394b1
add RawProduce api key
apacker Oct 19, 2023
578964c
revert rename of raw request/response structs
apacker Oct 19, 2023
8c2592d
actually return the right api key
apacker Oct 19, 2023
402fd2c
fix response api key
apacker Oct 19, 2023
df55dc2
change raw produce api name
apacker Oct 19, 2023
47882d1
catch raw record set write error
apacker Oct 19, 2023
c59e1e6
Revert "catch raw record set write error"
apacker Oct 19, 2023
487f4d5
update error messages in RawProduce
apacker Oct 19, 2023
26090fc
remove errant copy/pasta
apacker Oct 19, 2023
937e102
debug raw recordset writeto
apacker Oct 19, 2023
bcb0b5b
debug log
apacker Oct 19, 2023
1c20bd5
debug log
apacker Oct 19, 2023
9638d6b
debug log
apacker Oct 19, 2023
425c836
remove debug logging
apacker Oct 19, 2023
90ab5bf
map raw produce to api key 0
apacker Oct 19, 2023
5fd9815
remove raw response types
apacker Oct 20, 2023
06e6277
fix tests, add rrs.ReadFrom implementation
apacker Oct 24, 2023
81ce343
remove api key for rawproduce
apacker Nov 13, 2023
70149d2
dont init raw request
apacker Nov 13, 2023
81a74ba
handle nil records
apacker Nov 13, 2023
bb6dd7d
add ability to register overridden types
apacker Nov 13, 2023
12aec5d
clean up, fix rawproduce tests
apacker Nov 15, 2023
7b4314d
goimports
apacker Nov 15, 2023
350881e
remove raw recordset version and request prepare
apacker Nov 15, 2023
58dec7a
skip testing on kafka 0.10.x
apacker Nov 15, 2023
ab4a0a2
update docs, skip another test
apacker Nov 15, 2023
6337df1
update docs
apacker Nov 16, 2023
6c3c820
doc nit
apacker Nov 16, 2023
3215259
Update protocol/protocol.go
apacker Nov 30, 2023
e9051b4
fix override response key
apacker Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,37 @@ func Register(req, res Message) {
}
}

// OverrideTypeMessage is an interface implemented by messages that want to override the standard
// request/response types for a given API.
type OverrideTypeMessage interface {
TypeKey() OverrideTypeKey
}

type OverrideTypeKey int16

const (
RawProduceOverride OverrideTypeKey = 0
)

var overrideApiTypes [numApis]map[OverrideTypeKey]apiType

func RegisterOverride(req, res Message, key OverrideTypeKey) {
k1 := req.ApiKey()
k2 := res.ApiKey()

if k1 != k2 {
panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2))
}

if overrideApiTypes[k1] == nil {
overrideApiTypes[k1] = make(map[OverrideTypeKey]apiType)
}
overrideApiTypes[k1][key] = apiType{
requests: typesOf(req),
responses: typesOf(res),
}
}

func typesOf(v interface{}) []messageType {
return makeTypes(reflect.TypeOf(v).Elem())
}
Expand Down
8 changes: 8 additions & 0 deletions protocol/prototest/reflect.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prototest

import (
"bytes"
"errors"
"io"
"reflect"
Expand Down Expand Up @@ -49,6 +50,13 @@ func loadValue(v reflect.Value) (reset func()) {
}
resetFunc()
resets = append(resets, resetFunc)
case io.Reader:
buf, _ := io.ReadAll(x)
resetFunc := func() {
f.Set(reflect.ValueOf(bytes.NewBuffer(buf)))
}
resetFunc()
resets = append(resets, resetFunc)
}
})

Expand Down
33 changes: 33 additions & 0 deletions protocol/prototest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,39 @@ func TestRequest(t *testing.T, version int16, msg protocol.Message) {
})
}

// TestRequestWithOverride validates requests that have an overridden type. For requests with type overrides, we
// double-serialize the request to ensure the resulting encoding of the overridden and original type are identical.
func TestRequestWithOverride(t *testing.T, version int16, msg protocol.Message) {
reset := load(msg)

t.Run(fmt.Sprintf("v%d", version), func(t *testing.T) {
b1 := &bytes.Buffer{}

if err := protocol.WriteRequest(b1, version, 1234, "me", msg); err != nil {
t.Fatal(err)
}

reset()
t.Logf("\n%s\n", hex.Dump(b1.Bytes()))

_, _, _, req, err := protocol.ReadRequest(b1)
if err != nil {
t.Fatal(err)
}

b2 := &bytes.Buffer{}
if err := protocol.WriteRequest(b2, version, 1234, "me", req); err != nil {
t.Fatal(err)
}

if !deepEqual(b1, b2) {
t.Errorf("request message mismatch:")
t.Logf("expected: %+v", hex.Dump(b1.Bytes()))
t.Logf("found: %+v", hex.Dump(b2.Bytes()))
}
})
}

func BenchmarkRequest(b *testing.B, version int16, msg protocol.Message) {
reset := load(msg)

Expand Down
91 changes: 91 additions & 0 deletions protocol/rawproduce/rawproduce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package rawproduce

import (
"fmt"

"github.com/segmentio/kafka-go/protocol"
"github.com/segmentio/kafka-go/protocol/produce"
)

func init() {
// Register a type override so that raw produce requests will be encoded with the correct type.
req := &Request{}
protocol.RegisterOverride(req, &produce.Response{}, req.TypeKey())
}

type Request struct {
TransactionalID string `kafka:"min=v3,max=v8,nullable"`
Acks int16 `kafka:"min=v0,max=v8"`
Timeout int32 `kafka:"min=v0,max=v8"`
Topics []RequestTopic `kafka:"min=v0,max=v8"`
}

func (r *Request) ApiKey() protocol.ApiKey { return protocol.Produce }

func (r *Request) TypeKey() protocol.OverrideTypeKey { return protocol.RawProduceOverride }

func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
broker := protocol.Broker{ID: -1}

for i := range r.Topics {
t := &r.Topics[i]

topic, ok := cluster.Topics[t.Topic]
if !ok {
return broker, NewError(protocol.NewErrNoTopic(t.Topic))
}

for j := range t.Partitions {
p := &t.Partitions[j]

partition, ok := topic.Partitions[p.Partition]
if !ok {
return broker, NewError(protocol.NewErrNoPartition(t.Topic, p.Partition))
}

if b, ok := cluster.Brokers[partition.Leader]; !ok {
return broker, NewError(protocol.NewErrNoLeader(t.Topic, p.Partition))
} else if broker.ID < 0 {
broker = b
} else if b.ID != broker.ID {
return broker, NewError(fmt.Errorf("mismatching leaders (%d!=%d)", b.ID, broker.ID))
}
}
}

return broker, nil
}

func (r *Request) HasResponse() bool {
return r.Acks != 0
}

type RequestTopic struct {
Topic string `kafka:"min=v0,max=v8"`
Partitions []RequestPartition `kafka:"min=v0,max=v8"`
}

type RequestPartition struct {
Partition int32 `kafka:"min=v0,max=v8"`
RecordSet protocol.RawRecordSet `kafka:"min=v0,max=v8"`
}

var (
_ protocol.BrokerMessage = (*Request)(nil)
)

type Error struct {
Err error
}

func NewError(err error) *Error {
return &Error{Err: err}
}

func (e *Error) Error() string {
return fmt.Sprintf("fetch request error: %v", e.Err)
}

func (e *Error) Unwrap() error {
return e.Err
}
201 changes: 201 additions & 0 deletions protocol/rawproduce/rawproduce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package rawproduce_test

import (
"bytes"
"testing"
"time"

"github.com/segmentio/kafka-go/protocol"
"github.com/segmentio/kafka-go/protocol/prototest"
"github.com/segmentio/kafka-go/protocol/rawproduce"
)

const (
v0 = 0
v3 = 3
v5 = 5
)

func TestRawProduceRequest(t *testing.T) {
t0 := time.Now().Truncate(time.Millisecond)
t1 := t0.Add(1 * time.Millisecond)
t2 := t0.Add(2 * time.Millisecond)

prototest.TestRequestWithOverride(t, v0, &rawproduce.Request{
Acks: 1,
Timeout: 500,
Topics: []rawproduce.RequestTopic{
{
Topic: "topic-1",
Partitions: []rawproduce.RequestPartition{
{
Partition: 0,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: nil},
), 1, 0),
},
{
Partition: 1,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
), 1, 0),
},
},
},

{
Topic: "topic-2",
Partitions: []rawproduce.RequestPartition{
{
Partition: 0,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
), 1, protocol.Gzip),
},
},
},
},
})

prototest.TestRequestWithOverride(t, v3, &rawproduce.Request{
TransactionalID: "1234",
Acks: 1,
Timeout: 500,
Topics: []rawproduce.RequestTopic{
{
Topic: "topic-1",
Partitions: []rawproduce.RequestPartition{
{
Partition: 0,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: nil},
), 1, 0),
},
{
Partition: 1,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
), 1, 0),
},
},
},
},
})

headers := []protocol.Header{
{Key: "key-1", Value: []byte("value-1")},
{Key: "key-2", Value: []byte("value-2")},
{Key: "key-3", Value: []byte("value-3")},
}

prototest.TestRequestWithOverride(t, v5, &rawproduce.Request{
TransactionalID: "1234",
Acks: 1,
Timeout: 500,
Topics: []rawproduce.RequestTopic{
{
Topic: "topic-1",
Partitions: []rawproduce.RequestPartition{
{
Partition: 1,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0"), Headers: headers},
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
), 2, 0),
},
},
},

{
Topic: "topic-2",
Partitions: []rawproduce.RequestPartition{
{
Partition: 1,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0"), Headers: headers},
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
), 2, protocol.Snappy),
},
},
},
},
})
}

func NewRawRecordSet(reader protocol.RecordReader, version int8, attr protocol.Attributes) protocol.RawRecordSet {
rs := protocol.RecordSet{Version: version, Attributes: attr, Records: reader}
buf := &bytes.Buffer{}
rs.WriteTo(buf)

return protocol.RawRecordSet{
Reader: buf,
}
}

func BenchmarkProduceRequest(b *testing.B) {
t0 := time.Now().Truncate(time.Millisecond)
t1 := t0.Add(1 * time.Millisecond)
t2 := t0.Add(2 * time.Millisecond)

prototest.BenchmarkRequest(b, v3, &rawproduce.Request{
TransactionalID: "1234",
Acks: 1,
Timeout: 500,
Topics: []rawproduce.RequestTopic{
{
Topic: "topic-1",
Partitions: []rawproduce.RequestPartition{
{
Partition: 0,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: nil},
), 1, 0),
},
{
Partition: 1,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
), 1, 0),
},
},
},
},
})

headers := []protocol.Header{
{Key: "key-1", Value: []byte("value-1")},
{Key: "key-2", Value: []byte("value-2")},
{Key: "key-3", Value: []byte("value-3")},
}

prototest.BenchmarkRequest(b, v5, &rawproduce.Request{
TransactionalID: "1234",
Acks: 1,
Timeout: 500,
Topics: []rawproduce.RequestTopic{
{
Topic: "topic-1",
Partitions: []rawproduce.RequestPartition{
{
Partition: 1,
RecordSet: NewRawRecordSet(protocol.NewRecordReader(
protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0"), Headers: headers},
protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
), 2, 0),
},
},
},
},
})
}
Loading