Skip to content

Commit

Permalink
Make it possible to disable schemas (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Jan 9, 2025
1 parent 9bea3c0 commit ee46c21
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 32 deletions.
1 change: 1 addition & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {
Tables: s.config.Tables,
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
WithAvroSchema: s.config.WithAvroSchema,
SnapshotFetchSize: s.config.SnapshotFetchSize,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
}

records := make(chan opencdc.Record)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records, c.WithAvroSchema)

sub, err := internal.CreateSubscription(
ctx,
Expand Down
1 change: 1 addition & 0 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
TableKeys: map[string]string{table: "id"},
PublicationName: table, // table is random, reuse for publication name
SlotName: table, // table is random, reuse for slot name
WithAvroSchema: true,
}

i, err := NewCDCIterator(ctx, pool, config)
Expand Down
13 changes: 8 additions & 5 deletions source/logrepl/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
Tables []string
TableKeys map[string]string
WithSnapshot bool
WithAvroSchema bool
SnapshotFetchSize int
}

Expand Down Expand Up @@ -178,6 +179,7 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos
PublicationName: c.conf.PublicationName,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create CDC iterator: %w", err)
Expand All @@ -199,11 +201,12 @@ func (c *CombinedIterator) initSnapshotIterator(ctx context.Context, pos positio
}

snapshotIterator, err := snapshot.NewIterator(ctx, c.pool, snapshot.Config{
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create snapshot iterator: %w", err)
Expand Down
10 changes: 9 additions & 1 deletion source/logrepl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ type CDCHandler struct {
out chan<- opencdc.Record
lastTXLSN pglogrepl.LSN

withAvroSchema bool
keySchemas map[string]cschema.Schema
payloadSchemas map[string]cschema.Schema
}

func NewCDCHandler(rs *internal.RelationSet, tableKeys map[string]string, out chan<- opencdc.Record) *CDCHandler {
func NewCDCHandler(rs *internal.RelationSet, tableKeys map[string]string, out chan<- opencdc.Record, withAvroSchema bool) *CDCHandler {
return &CDCHandler{
tableKeys: tableKeys,
relationSet: rs,
out: out,
withAvroSchema: withAvroSchema,
keySchemas: make(map[string]cschema.Schema),
payloadSchemas: make(map[string]cschema.Schema),
}
Expand Down Expand Up @@ -245,6 +247,9 @@ func (*CDCHandler) buildPosition(lsn pglogrepl.LSN) opencdc.Position {
// updateAvroSchema generates and stores avro schema based on the relation's row,
// when usage of avro schema is requested.
func (h *CDCHandler) updateAvroSchema(ctx context.Context, rel *pglogrepl.RelationMessage) error {
if !h.withAvroSchema {
return nil
}
// Payload schema
avroPayloadSch, err := schema.Avro.ExtractLogrepl(rel.RelationName+"_payload", rel)
if err != nil {
Expand Down Expand Up @@ -281,6 +286,9 @@ func (h *CDCHandler) updateAvroSchema(ctx context.Context, rel *pglogrepl.Relati
}

func (h *CDCHandler) attachSchemas(rec opencdc.Record, relationName string) {
if !h.withAvroSchema {
return
}
cschema.AttachPayloadSchemaToRecord(rec, h.payloadSchemas[relationName])
cschema.AttachKeySchemaToRecord(rec, h.keySchemas[relationName])
}
34 changes: 21 additions & 13 deletions source/snapshot/fetch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ var supportedKeyTypes = []string{
}

type FetchConfig struct {
Table string
Key string
TXSnapshotID string
FetchSize int
Position position.Position
Table string
Key string
TXSnapshotID string
FetchSize int
Position position.Position
WithAvroSchema bool
}

var (
Expand Down Expand Up @@ -337,14 +338,17 @@ func (f *FetchWorker) buildFetchData(fields []pgconn.FieldDescription, values []
return FetchData{}, fmt.Errorf("failed to encode record data: %w", err)
}

return FetchData{
Key: key,
Payload: payload,
Position: pos,
Table: f.conf.Table,
PayloadSchema: *f.payloadSchema,
KeySchema: *f.keySchema,
}, nil
fd := FetchData{
Key: key,
Payload: payload,
Position: pos,
Table: f.conf.Table,
}
if f.conf.WithAvroSchema {
fd.PayloadSchema = *f.payloadSchema
fd.KeySchema = *f.keySchema
}
return fd, nil
}

func (f *FetchWorker) buildSnapshotPosition(fields []pgconn.FieldDescription, values []any) (position.SnapshotPosition, error) {
Expand Down Expand Up @@ -467,6 +471,10 @@ func (*FetchWorker) validateTable(ctx context.Context, table string, tx pgx.Tx)
}

func (f *FetchWorker) extractSchemas(ctx context.Context, fields []pgconn.FieldDescription) error {
if !f.conf.WithAvroSchema {
return nil
}

if f.payloadSchema == nil {
sdk.Logger(ctx).Debug().
Msgf("extracting payload schema for %v fields in %v", len(fields), f.conf.Table)
Expand Down
28 changes: 16 additions & 12 deletions source/snapshot/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (
var ErrIteratorDone = errors.New("snapshot complete")

type Config struct {
Position opencdc.Position
Tables []string
TableKeys map[string]string
TXSnapshotID string
FetchSize int
Position opencdc.Position
Tables []string
TableKeys map[string]string
TXSnapshotID string
FetchSize int
WithAvroSchema bool
}

type Iterator struct {
Expand Down Expand Up @@ -122,8 +123,10 @@ func (i *Iterator) buildRecord(d FetchData) opencdc.Record {
metadata["postgres.table"] = d.Table

rec := sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload)
cschema.AttachKeySchemaToRecord(rec, d.KeySchema)
cschema.AttachPayloadSchemaToRecord(rec, d.PayloadSchema)
if i.conf.WithAvroSchema {
cschema.AttachKeySchemaToRecord(rec, d.KeySchema)
cschema.AttachPayloadSchemaToRecord(rec, d.PayloadSchema)
}

return rec
}
Expand All @@ -135,11 +138,12 @@ func (i *Iterator) initFetchers(ctx context.Context) error {

for j, t := range i.conf.Tables {
w := NewFetchWorker(i.db, i.data, FetchConfig{
Table: t,
Key: i.conf.TableKeys[t],
TXSnapshotID: i.conf.TXSnapshotID,
Position: i.lastPosition,
FetchSize: i.conf.FetchSize,
Table: t,
Key: i.conf.TableKeys[t],
TXSnapshotID: i.conf.TXSnapshotID,
Position: i.lastPosition,
FetchSize: i.conf.FetchSize,
WithAvroSchema: i.conf.WithAvroSchema,
})

if err := w.Validate(ctx); err != nil {
Expand Down

0 comments on commit ee46c21

Please sign in to comment.