Skip to content

Commit

Permalink
Added support for Idempotent Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
mimaison committed Aug 20, 2018
1 parent e7238b1 commit 0e0f112
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 5 deletions.
94 changes: 91 additions & 3 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"encoding/binary"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -47,6 +48,81 @@ type AsyncProducer interface {
Errors() <-chan *ProducerError
}

// TransactionManager keeps the state necessary to ensure idempotent production
type TransactionManager interface {
// Return true if Idempotency is enabled
Idempotent() bool

// Returns the current ProducerID
ProducerID() int64

// Returns the current ProducerEpoch
ProducerEpoch() int16

// Returns the next available sequence number
GetAndIncrementSequenceNumber(topic string, partition int32) int32
}

type transactionManager struct {
idempotent bool
producerID int64
producerEpoch int16
sequenceNumbers map[string]int32
mutex sync.Mutex
}

func (t *transactionManager) Idempotent() bool {
return t.idempotent
}

func (t *transactionManager) ProducerID() int64 {
return t.producerID
}

func (t *transactionManager) ProducerEpoch() int16 {
return t.producerEpoch
}

func (t *transactionManager) GetAndIncrementSequenceNumber(topic string, partition int32) int32 {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence
}

func newTransactionManager(conf *Config, client Client) (TransactionManager, error) {
var pid int64 = -1
var epoch int16 = -1
if conf.Producer.Idempotent {
if !conf.Version.IsAtLeast(V0_11_0_0) {
return nil, ConfigurationError("You must set Version to at least 0.11 when using the Idempotent producer")
}
if conf.Producer.Retry.Max == 0 {
return nil, ConfigurationError("You must set Retry.Max to at least 1 when using the Idempotent producer")
}
if conf.Net.MaxOpenRequests > 5 {
return nil, ConfigurationError("You must set Net.MaxOpenRequests to at most 5 when using the Idempotent producer")
}
initProducerIDResponse, err := client.InitProducerID()
if err != nil {
return nil, errors.New("Unable to retrieve a ProducerID")
}
pid = initProducerIDResponse.ProducerID
epoch = initProducerIDResponse.ProducerEpoch
}

txnmgr := &transactionManager{
idempotent: conf.Producer.Idempotent,
producerID: pid,
producerEpoch: epoch,
sequenceNumbers: make(map[string]int32),
mutex: sync.Mutex{},
}
return txnmgr, nil
}

type asyncProducer struct {
client Client
conf *Config
Expand All @@ -59,6 +135,8 @@ type asyncProducer struct {
brokers map[*Broker]chan<- *ProducerMessage
brokerRefs map[chan<- *ProducerMessage]int
brokerLock sync.Mutex

txnmgr TransactionManager
}

// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
Expand All @@ -84,6 +162,11 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
return nil, ErrClosedClient
}

txnmgr, err := newTransactionManager(client.Config(), client)
if err != nil {
return nil, err
}

p := &asyncProducer{
client: client,
conf: client.Config(),
Expand All @@ -93,6 +176,7 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan<- *ProducerMessage),
brokerRefs: make(map[chan<- *ProducerMessage]int),
txnmgr: txnmgr,
}

// launch our singleton dispatchers
Expand Down Expand Up @@ -145,9 +229,10 @@ type ProducerMessage struct {
// least version 0.10.0.
Timestamp time.Time

retries int
flags flagSet
expectation chan *ProducerError
retries int
flags flagSet
expectation chan *ProducerError
sequenceNumber int32
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand Down Expand Up @@ -328,6 +413,9 @@ func (tp *topicProducer) dispatch() {
continue
}
}
if msg.retries == 0 {
msg.sequenceNumber = tp.parent.txnmgr.GetAndIncrementSequenceNumber(msg.Topic, msg.Partition)
}

handler := tp.handlers[msg.Partition]
if handler == nil {
Expand Down
52 changes: 52 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,58 @@ func TestAsyncProducerNoReturns(t *testing.T) {
leader.Close()
}

func TestAsyncProducerIdempotent(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

clusterID := "cid"
metadataResponse := &MetadataResponse{
Version: 3,
ThrottleTimeMs: 0,
ClusterID: &clusterID,
ControllerID: 1,
}
metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: 1000,
ProducerEpoch: 1,
}
seedBroker.Returns(initProducerID)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 4
config.Producer.Retry.Backoff = 0
config.Producer.Idempotent = true
config.Version = V0_11_0_0
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodSuccess := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectResults(t, producer, 10, 0)

seedBroker.Close()
leader.Close()
closeProducer(t, producer)
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
23 changes: 23 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Client interface {
// in local cache. This function only works on Kafka 0.8.2 and higher.
RefreshCoordinator(consumerGroup string) error

// InitProducerID retrieves information required for Idempotent Producer
InitProducerID() (*InitProducerIDResponse, error)

// Close shuts down all broker connections managed by this client. It is required
// to call this function before a client object passes out of scope, as it will
// otherwise leak memory. You must close any Producers or Consumers using a client
Expand Down Expand Up @@ -183,6 +186,26 @@ func (client *client) Brokers() []*Broker {
return brokers
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {

req := &InitProducerIDRequest{}

response, err := broker.InitProducerID(req)
switch err.(type) {
case nil:
return response, nil
default:
// some error, remove that broker and try again
Logger.Printf("Error is %v", err)
_ = broker.Close()
client.deregisterBroker(broker)
}
}
return nil, err
}

func (client *client) Close() error {
if client.Closed() {
// Chances are this is being called from a defer() and the error will go unobserved
Expand Down
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type Config struct {
// (defaults to hashing the message key). Similar to the `partitioner.class`
// setting for the JVM producer.
Partitioner PartitionerConstructor
// If enabled, the producer will ensure that exactly one copy of each message is
// written.
Idempotent bool

// Return specifies what channels will be populated. If they are set to true,
// you must read from the respective channels to prevent deadlock. If,
Expand Down
8 changes: 7 additions & 1 deletion produce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,11 @@ func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err K
byTopic = make(map[int32]*ProduceResponseBlock)
r.Blocks[topic] = byTopic
}
byTopic[partition] = &ProduceResponseBlock{Err: err}
block := &ProduceResponseBlock{
Err: err,
}
if r.Version >= 2 {
block.Timestamp = time.Now()
}
byTopic[partition] = block
}
10 changes: 9 additions & 1 deletion produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
batch := &RecordBatch{
FirstTimestamp: timestamp,
Version: 2,
ProducerID: -1, /* No producer id */
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
}
ps.setProducerState(batch, msg.Topic, msg.Partition, msg.sequenceNumber)
set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
size = recordBatchOverhead
} else {
Expand Down Expand Up @@ -108,6 +108,14 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
return nil
}

func (ps *produceSet) setProducerState(batch *RecordBatch, topic string, partition int32, sequence int32) {
if ps.parent.txnmgr.Idempotent() {
batch.ProducerID = ps.parent.txnmgr.ProducerID()
batch.ProducerEpoch = ps.parent.txnmgr.ProducerEpoch()
batch.FirstSequence = sequence
}
}

func (ps *produceSet) buildRequest() *ProduceRequest {
req := &ProduceRequest{
RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
Expand Down
93 changes: 93 additions & 0 deletions produce_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package sarama

import (
"fmt"
"sync"
"testing"
"time"
)

func makeProduceSet() (*asyncProducer, *produceSet) {
parent := &asyncProducer{
conf: NewConfig(),
txnmgr: &transactionManager{
idempotent: false,
},
}
return parent, newProduceSet(parent)
}
Expand Down Expand Up @@ -253,3 +257,92 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
}
}
}

func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
var pID int64 = 1000
var pEpoch int16 = 1234
parent := &asyncProducer{
conf: NewConfig(),
txnmgr: &transactionManager{
idempotent: true,
producerID: pID,
producerEpoch: pEpoch,
sequenceNumbers: make(map[string]int32),
mutex: sync.Mutex{},
},
}
ps := newProduceSet(parent)
parent.conf.Producer.RequiredAcks = WaitForAll
parent.conf.Producer.Timeout = 10 * time.Second
parent.conf.Version = V0_11_0_0
parent.conf.Producer.Idempotent = true

now := time.Now()
msg := &ProducerMessage{
Topic: "t1",
Partition: 0,
Key: StringEncoder(TestMessage),
Value: StringEncoder(TestMessage),
Headers: []RecordHeader{
RecordHeader{
Key: []byte("header-1"),
Value: []byte("value-1"),
},
RecordHeader{
Key: []byte("header-2"),
Value: []byte("value-2"),
},
RecordHeader{
Key: []byte("header-3"),
Value: []byte("value-3"),
},
},
Timestamp: now,
sequenceNumber: 123,
}
for i := 0; i < 10; i++ {
safeAddMessage(t, ps, msg)
msg.Timestamp = msg.Timestamp.Add(time.Second)
}

req := ps.buildRequest()

if req.Version != 3 {
t.Error("Wrong request version")
}

batch := req.records["t1"][0].RecordBatch
if batch.FirstTimestamp != now {
t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
}
if batch.ProducerID != pID {
t.Errorf("Wrong producerID: %v", batch.ProducerID)
}
if batch.ProducerEpoch != pEpoch {
t.Errorf("Wrong producerEpoch: %v", batch.ProducerEpoch)
}
if batch.FirstSequence != 123 {
t.Errorf("Wrong first sequence: %v", batch.FirstSequence)
}
for i := 0; i < 10; i++ {
rec := batch.Records[i]
if rec.TimestampDelta != time.Duration(i)*time.Second {
t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
}

if rec.OffsetDelta != int64(i) {
t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
}

for j, h := range batch.Records[i].Headers {
exp := fmt.Sprintf("header-%d", j+1)
if string(h.Key) != exp {
t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
}
exp = fmt.Sprintf("value-%d", j+1)
if string(h.Value) != exp {
t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
}
}
}
}

0 comments on commit 0e0f112

Please sign in to comment.