Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avro schema generator #174

Merged
merged 9 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/conduitio/conduit-connector-postgres

go 1.22
go 1.22.0

require (
github.com/Masterminds/sprig/v3 v3.2.3
Expand All @@ -11,6 +11,7 @@ require (
github.com/golangci/golangci-lint v1.59.1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/hamba/avro/v2 v2.22.1
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.6.0
Expand Down Expand Up @@ -110,6 +111,7 @@ require (
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
github.com/jjti/go-spancheck v0.6.1 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/julz/importas v0.1.0 // indirect
github.com/karamaru-alpha/copyloopvar v1.1.0 // indirect
github.com/kisielk/errcheck v1.7.0 // indirect
Expand Down Expand Up @@ -139,6 +141,8 @@ require (
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/moricho/tparallel v0.3.1 // indirect
github.com/nakabonne/nestif v0.3.1 // indirect
github.com/nishanths/exhaustive v0.12.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ github.com/gostaticanalysis/nilerr v0.1.1/go.mod h1:wZYb6YI5YAxxq0i1+VJbY0s2YONW
github.com/gostaticanalysis/testutil v0.3.1-0.20210208050101-bfb5c8eec0e4/go.mod h1:D+FIZ+7OahH3ePw/izIEeH5I06eKs1IKI4Xr64/Am3M=
github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoISdUv3PPQgHY=
github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU=
github.com/hamba/avro/v2 v2.22.1 h1:q1rAbfJsrbMaZPDLQvwUQMfQzp6H+hGXvckmU/lXemk=
github.com/hamba/avro/v2 v2.22.1/go.mod h1:HOeTrE3kvWnBAgsufqhAzDDV5gvS0QXs65Z6BHfGgbg=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.6.0 h1:wgd4KxHJTVGGqWBq4QPB1i5BZNEx9BR8+OFmHDmTk8A=
Expand Down Expand Up @@ -356,6 +358,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
Expand Down Expand Up @@ -440,9 +443,11 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/moricho/tparallel v0.3.1 h1:fQKD4U1wRMAYNngDonW5XupoB/ZGJHdpzrWqgyg9krA=
github.com/moricho/tparallel v0.3.1/go.mod h1:leENX2cUv7Sv2qDgdi0D0fCftN8fRC67Bcn8pqzeYNI=
Expand Down
1 change: 1 addition & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
SnapshotFetchSize: s.config.SnapshotFetchSize,
WithAvroSchema: s.config.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand Down
5 changes: 5 additions & 0 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
type Config struct {
// URL is the connection string for the Postgres database.
URL string `json:"url" validate:"required"`

// Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
// Use "*" if you'd like to listen to all tables.
Tables []string `json:"tables"`
Expand All @@ -74,6 +75,10 @@ type Config struct {
// LogreplAutoCleanup determines if the replication slot and publication should be
// removed when the connector is deleted.
LogreplAutoCleanup bool `json:"logrepl.autoCleanup" default:"true"`

// WithAvroSchema determines whether the connector should attach an avro schema on each
// record.
WithAvroSchema bool `json:"logrepl.withAvroSchema" default:"false"`
}

// Validate validates the provided config values.
Expand Down
4 changes: 3 additions & 1 deletion source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type CDCConfig struct {
PublicationName string
Tables []string
TableKeys map[string]string
WithAvroSchema bool
}

// CDCIterator asynchronously listens for events from the logical replication
Expand Down Expand Up @@ -63,6 +64,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
}

records := make(chan sdk.Record)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, c.WithAvroSchema, records)

sub, err := internal.CreateSubscription(
ctx,
Expand All @@ -71,7 +73,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
c.PublicationName,
c.Tables,
c.LSN,
NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records).Handle,
handler.Handle,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize subscription: %w", err)
Expand Down
13 changes: 8 additions & 5 deletions source/logrepl/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
TableKeys map[string]string
WithSnapshot bool
SnapshotFetchSize int
WithAvroSchema bool
}

// Validate performs validation tasks on the config.
Expand Down Expand Up @@ -177,6 +178,7 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos
PublicationName: c.conf.PublicationName,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create CDC iterator: %w", err)
Expand All @@ -198,11 +200,12 @@ func (c *CombinedIterator) initSnapshotIterator(ctx context.Context, pos positio
}

snapshotIterator, err := snapshot.NewIterator(ctx, c.pool, snapshot.Config{
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create snapshot iterator: %w", err)
Expand Down
52 changes: 46 additions & 6 deletions source/logrepl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
"github.com/conduitio/conduit-connector-postgres/source/position"
"github.com/conduitio/conduit-connector-postgres/source/schema"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/hamba/avro/v2"
"github.com/jackc/pglogrepl"
)

Expand All @@ -31,17 +33,23 @@ type CDCHandler struct {
relationSet *internal.RelationSet
out chan<- sdk.Record
lastTXLSN pglogrepl.LSN

relAvroSchema map[string]avro.Schema
withAvroSchema bool
}

func NewCDCHandler(
rs *internal.RelationSet,
tableKeys map[string]string,
withAvroSchema bool,
out chan<- sdk.Record,
) *CDCHandler {
return &CDCHandler{
tableKeys: tableKeys,
relationSet: rs,
out: out,
tableKeys: tableKeys,
relationSet: rs,
out: out,
withAvroSchema: withAvroSchema,
relAvroSchema: make(map[string]avro.Schema),
}
}

Expand Down Expand Up @@ -100,6 +108,10 @@ func (h *CDCHandler) handleInsert(
return fmt.Errorf("failed to decode new values: %w", err)
}

if err := h.updateAvroSchema(rel, msg.Tuple); err != nil {
return fmt.Errorf("failed to update avro schema: %w", err)
}

rec := sdk.Util.Source.NewRecordCreate(
h.buildPosition(lsn),
h.buildRecordMetadata(rel),
Expand Down Expand Up @@ -127,6 +139,10 @@ func (h *CDCHandler) handleUpdate(
return fmt.Errorf("failed to decode new values: %w", err)
}

if err := h.updateAvroSchema(rel, msg.NewTuple); err != nil {
return fmt.Errorf("failed to update avro schema: %w", err)
}

oldValues, err := h.relationSet.Values(msg.RelationID, msg.OldTuple)
if err != nil {
// this is not a critical error, old values are optional, just log it
Expand Down Expand Up @@ -180,10 +196,16 @@ func (h *CDCHandler) send(ctx context.Context, rec sdk.Record) error {
}
}

func (h *CDCHandler) buildRecordMetadata(relation *pglogrepl.RelationMessage) map[string]string {
return map[string]string{
sdk.MetadataCollection: relation.RelationName,
func (h *CDCHandler) buildRecordMetadata(rel *pglogrepl.RelationMessage) map[string]string {
m := map[string]string{
sdk.MetadataCollection: rel.RelationName,
}

if h.withAvroSchema {
m[schema.AvroMetadataKey] = h.relAvroSchema[rel.RelationName].String()
}

return m
}

// buildRecordKey takes the values from the message and extracts the key that
Expand All @@ -209,9 +231,27 @@ func (h *CDCHandler) buildRecordPayload(values map[string]any) sdk.Data {
return sdk.StructuredData(values)
}

// buildPosition stores the LSN in position and converts it to bytes.
func (*CDCHandler) buildPosition(lsn pglogrepl.LSN) sdk.Position {
return position.Position{
Type: position.TypeCDC,
LastLSN: lsn.String(),
}.ToSDKPosition()
}

// updateAvroSchema generates and stores avro schema based on the relation's row,
// when usage of avro schema is requested.
func (h *CDCHandler) updateAvroSchema(rel *pglogrepl.RelationMessage, row *pglogrepl.TupleData) error {
if !h.withAvroSchema {
return nil
}

sch, err := schema.Avro.ExtractLogrepl(rel, row)
if err != nil {
return err
}

h.relAvroSchema[rel.RelationName] = sch

return nil
}
2 changes: 1 addition & 1 deletion source/logrepl/internal/relationset.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (rs *RelationSet) Values(id uint32, row *pglogrepl.TupleData) (map[string]a
return nil, fmt.Errorf("failed to decode tuple %d: %w", i, err)
}

v, err := types.Format(val)
v, err := types.Format(col.DataType, val)
if err != nil {
return nil, fmt.Errorf("failed to format column %q type %T: %w", col.Name, val, err)
}
Expand Down
4 changes: 2 additions & 2 deletions source/logrepl/internal/relationset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func isValuesAllTypes(is *is.I, got map[string]any) {
"col_timestamptz": time.Date(2022, 3, 14, 15+8, 16, 17, 0, time.UTC).UTC(),
"col_tsquery": "'fat' & ( 'rat' | 'cat' )",
"col_tsvector": "'a' 'and' 'ate' 'cat' 'fat' 'mat' 'on' 'rat' 'sat'",
"col_uuid": [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66},
"col_uuid": "bd94ee0b-564f-4088-bf4e-8d5e626caf66", // [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66}
"col_xml": "<foo>bar</foo>",
}
is.Equal("", cmp.Diff(want, got,
Expand Down Expand Up @@ -440,7 +440,7 @@ func isValuesAllTypesStandalone(is *is.I, got map[string]any) {
"col_timestamptz": time.Date(2022, 3, 14, 15+8, 16, 17, 0, time.UTC).UTC().String(),
"col_tsquery": "'fat' & ( 'rat' | 'cat' )",
"col_tsvector": "'a' 'and' 'ate' 'cat' 'fat' 'mat' 'on' 'rat' 'sat'",
"col_uuid": [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66},
"col_uuid": "bd94ee0b-564f-4088-bf4e-8d5e626caf66", // [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66}
"col_xml": "<foo>bar</foo>",
}
is.Equal("", cmp.Diff(want, got,
Expand Down
6 changes: 6 additions & 0 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading