forked from FoxComm/metamorphosis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathavro.go
34 lines (27 loc) · 742 Bytes
/
avro.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package metamorphosis
import (
"errors"
"fmt"
"github.com/elodina/go-avro"
"github.com/elodina/go_kafka_client"
)
// AvroMessage represents a Kafka message that's been successfully decoded with
// Avro.
type AvroMessage interface {
// Bytes returns a byte array containing the parsed contents of the message.
Bytes() []byte
}
type avroMessage struct {
record *avro.GenericRecord
}
func newAvroMessage(message *go_kafka_client.Message) (AvroMessage, error) {
record, ok := message.DecodedValue.(*avro.GenericRecord)
if !ok {
return nil, errors.New("Unable to decode message")
}
return &avroMessage{record}, nil
}
func (am avroMessage) Bytes() []byte {
recordStr := fmt.Sprintf("%v", am.record)
return []byte(recordStr)
}