Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate connector.yaml #232

Merged
merged 30 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2cfa09c
Generate connector.yaml
hariso Dec 24, 2024
bb04c67
Add connector.yaml
hariso Dec 24, 2024
54b3f16
generate connector.yaml
hariso Dec 24, 2024
0f306c0
Set version
hariso Dec 24, 2024
a485549
more updates
hariso Dec 24, 2024
511bc21
make build
hariso Jan 16, 2025
b96f73e
update sdk
hariso Jan 16, 2025
32e84b2
Migrate to specgen
hariso Jan 17, 2025
3407604
Merge branch 'main' into haris/specgen
hariso Jan 17, 2025
b5bfdfe
go.sum fix
hariso Jan 17, 2025
6e1088e
lint
hariso Jan 17, 2025
91914bf
Merge branch 'migrate-to-specgen' into haris/specgen
hariso Jan 17, 2025
24f5c86
Upgrade SDK
hariso Jan 29, 2025
f44a634
Update connector.yaml
hariso Jan 29, 2025
91a198c
validate generated files
hariso Jan 29, 2025
b9872ab
Merge branch 'haris/specgen' of github.com:ConduitIO/conduit-connecto…
hariso Jan 29, 2025
1e6ae79
rename
hariso Jan 29, 2025
179ac45
update sdk, use conn-sdk-cli
lovromazgon Jan 31, 2025
0aee455
use readmegen
lovromazgon Jan 31, 2025
79a6ecb
remove local replacement
lovromazgon Jan 31, 2025
5c380d4
add phony generate
lovromazgon Jan 31, 2025
3ee4187
respect logrepl.withAvroSchema
lovromazgon Jan 31, 2025
646634d
use latest sdk from main
lovromazgon Jan 31, 2025
62c69a7
inline errors
hariso Jan 31, 2025
42fdf69
Merge branch 'haris/specgen' of github.com:ConduitIO/conduit-connecto…
hariso Jan 31, 2025
3c67968
receivers
hariso Jan 31, 2025
88bf253
upgrade conduit-connector-sdk to v0.13.0
lovromazgon Jan 31, 2025
650007e
inline errors
lovromazgon Jan 31, 2025
337cf49
Merge branch 'main' into haris/specgen
lovromazgon Jan 31, 2025
b09e561
make generate
lovromazgon Jan 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: release
on:
push:
tags:
- v*
- '*'

permissions:
contents: write
Expand All @@ -18,6 +18,50 @@ jobs:
with:
fetch-depth: 0

- name: Validate Tag Format
run: |
TAG=${GITHUB_REF#refs/tags/}

SV_REGEX="^v(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)(-((0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*))*))?(\+([0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*))?$"

if ! [[ $TAG =~ $SV_REGEX ]]; then
echo "$TAG is NOT a valid tag (expected format: v<semver>)"
exit 1
fi

- name: Check Version Consistency
run: |
# Extract tag and remove 'v' prefix if exists
TAG=${GITHUB_REF#refs/tags/}

# Read version from connector.yaml
YAML_VERSION=$(yq e '.specification.version' connector.yaml)

# Compare versions
if [[ "$TAG" != "$YAML_VERSION" ]]; then
echo "Version mismatch detected:"
echo "Git Tag: $TAG"
echo "connector.yaml Version: $YAML_VERSION"
exit 1
fi

- name: Delete Invalid Tag
if: failure()
uses: actions/github-script@v7
with:
github-token: ${{secrets.GITHUB_TOKEN}}
script: |
const tag = context.ref.replace('refs/tags/', '')
try {
await github.rest.git.deleteRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: `tags/${tag}`
})
} catch (error) {
console.log('Error deleting tag:', error)
}

- name: Set up Go
uses: actions/setup-go@v5
with:
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lint:
.PHONY: generate
generate:
go generate ./...
conn-sdk-cli readmegen -w

.PHONY: fmt
fmt:
Expand Down
283 changes: 219 additions & 64 deletions README.md

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate conn-sdk-cli specgen

package postgres

import (
_ "embed"

sdk "github.com/conduitio/conduit-connector-sdk"
)

//go:embed connector.yaml
var specs string

var version = "(devel)"

var Connector = sdk.Connector{
NewSpecification: Specification,
NewSpecification: sdk.YAMLSpecification(specs, version),
NewSource: NewSource,
NewDestination: NewDestination,
}
299 changes: 299 additions & 0 deletions connector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
version: "1.0"
specification:
name: postgres
summary: Conduit connector for PostgreSQL
description: |
## Source

The Postgres Source Connector connects to a database with the provided `url` and
starts creating records for each change detected in the provided tables.

Upon starting, the source takes a snapshot of the provided tables in the database,
then switches into CDC mode. In CDC mode, the plugin reads from a buffer of CDC events.

### Snapshot

When the connector first starts, snapshot mode is enabled. The connector acquires
a read-only lock on the tables, and then reads all rows of the tables into Conduit.
Once all rows in that initial snapshot are read the connector releases its lock and
switches into CDC mode.

This behavior is enabled by default, but can be turned off by adding
`"snapshotMode": "never"` to the Source configuration.

### Change Data Capture

This connector implements Change Data Capture (CDC) features for PostgreSQL by
creating a logical replication slot and a publication that listens to changes in the
configured tables. Every detected change is converted into a record. If there are no
records available, the connector blocks until a record is available or the connector
receives a stop signal.

#### Logical Replication Configuration

When the connector switches to CDC mode, it attempts to run the initial setup commands
to create its logical replication slot and publication. It will connect to an existing
slot if one with the configured name exists.

The Postgres user specified in the connection URL must have sufficient privileges to
run all of these setup commands, or it will fail.

Example pipeline configuration that's using logical replication:

```yaml
version: 2.2
pipelines:
- id: pg-to-log
status: running
connectors:
- id: pg
type: source
plugin: builtin:postgres
settings:
url: "postgres://exampleuser:examplepass@localhost:5433/exampledb?sslmode=disable"
tables: "users"
cdcMode: "logrepl"
logrepl.publicationName: "examplepub"
logrepl.slotName": "exampleslot"
- id: log
type: destination
plugin: builtin:log
settings:
level: info
```

:warning: When the connector or pipeline is deleted, the connector will automatically
attempt to delete the replication slot and publication. This is the default behaviour
and can be disabled by setting `logrepl.autoCleanup` to `false`.

### Key Handling

The connector will automatically look up the primary key column for the specified tables
and use them as the key value. If that can't be determined, the connector will return
an error.

## Destination

The Postgres Destination takes a Conduit record and stores it using a SQL statement.
The Destination is designed to handle different payloads and keys. Because of this,
each record is individually parsed and upserted.

### Handling record operations

Based on the `Operation` field in the record, the destination will either insert,
update or delete the record in the target table. Snapshot records are always inserted.

If the target table already contains a record with the same key as a record being
inserted, the record will be updated (upserted). This can overwrite and thus potentially
lose data, so keys should be assigned correctly from the Source.

If the target table does not contain a record with the same key as a record being
deleted, the record will be ignored.

If there is no key, the record will be simply appended.
version: v0.11.0-dev
author: Meroxa, Inc.
source:
parameters:
- name: url
description: URL is the connection string for the Postgres database.
type: string
default: ""
validations:
- type: required
value: ""
- name: cdcMode
description: CDCMode determines how the connector should listen to changes.
type: string
default: auto
validations:
- type: inclusion
value: auto,logrepl
- name: logrepl.autoCleanup
description: |-
LogreplAutoCleanup determines if the replication slot and publication should be
removed when the connector is deleted.
type: bool
default: "true"
validations: []
- name: logrepl.publicationName
description: |-
LogreplPublicationName determines the publication name in case the
connector uses logical replication to listen to changes (see CDCMode).
type: string
default: conduitpub
validations: []
- name: logrepl.slotName
description: |-
LogreplSlotName determines the replication slot name in case the
connector uses logical replication to listen to changes (see CDCMode).
type: string
default: conduitslot
validations: []
- name: logrepl.withAvroSchema
description: |-
WithAvroSchema determines whether the connector should attach an avro schema on each
record.
type: bool
default: "true"
validations: []
- name: sdk.batch.delay
description: Maximum delay before an incomplete batch is read from the source.
type: duration
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.batch.size
description: Maximum size of batch before it gets read from the source.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.schema.context.enabled
description: |-
Specifies whether to use a schema context name. If set to false, no schema context name will
be used, and schemas will be saved with the subject name specified in the connector
(not safe because of name conflicts).
type: bool
default: "true"
validations: []
- name: sdk.schema.context.name
description: |-
Schema context name to be used. Used as a prefix for all schema subject names.
If empty, defaults to the connector ID.
type: string
default: ""
validations: []
- name: sdk.schema.extract.key.enabled
description: Whether to extract and encode the record key with a schema.
type: bool
default: "false"
validations: []
- name: sdk.schema.extract.key.subject
description: |-
The subject of the key schema. If the record metadata contains the field
"opencdc.collection" it is prepended to the subject name and separated
with a dot.
type: string
default: key
validations: []
- name: sdk.schema.extract.payload.enabled
description: Whether to extract and encode the record payload with a schema.
type: bool
default: "false"
validations: []
- name: sdk.schema.extract.payload.subject
description: |-
The subject of the payload schema. If the record metadata contains the
field "opencdc.collection" it is prepended to the subject name and
separated with a dot.
type: string
default: payload
validations: []
- name: sdk.schema.extract.type
description: The type of the payload schema.
type: string
default: avro
validations:
- type: inclusion
value: avro
- name: snapshot.fetchSize
description: Snapshot fetcher size determines the number of rows to retrieve at a time.
type: int
default: "50000"
validations: []
- name: snapshotMode
description: SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode.
type: string
default: initial
validations:
- type: inclusion
value: initial,never
- name: table
description: 'Deprecated: use `tables` instead.'
type: string
default: ""
validations: []
- name: tables
description: |-
Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
Use "*" if you'd like to listen to all tables.
type: string
default: ""
validations: []
destination:
parameters:
- name: url
description: URL is the connection string for the Postgres database.
type: string
default: ""
validations:
- type: required
value: ""
- name: key
description: Key represents the column name for the key used to identify and update existing rows.
type: string
default: ""
validations: []
- name: sdk.batch.delay
description: Maximum delay before an incomplete batch is written to the destination.
type: duration
default: "0"
validations: []
- name: sdk.batch.size
description: Maximum size of batch before it gets written to the destination.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.rate.burst
description: |-
Allow bursts of at most X records (0 or less means that bursts are not
limited). Only takes effect if a rate limit per second is set. Note that
if `sdk.batch.size` is bigger than `sdk.rate.burst`, the effective batch
size will be equal to `sdk.rate.burst`.
type: int
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.rate.perSecond
description: Maximum number of records written per second (0 means no rate limit).
type: float
default: "0"
validations:
- type: greater-than
value: "-1"
- name: sdk.record.format
description: |-
The format of the output record. See the Conduit documentation for a full
list of supported formats (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
type: string
default: opencdc/json
validations: []
- name: sdk.record.format.options
description: |-
Options to configure the chosen output record format. Options are normally
key=value pairs separated with comma (e.g. opt1=val2,opt2=val2), except
for the `template` record format, where options are a Go template.
type: string
default: ""
validations: []
- name: sdk.schema.extract.key.enabled
description: Whether to extract and decode the record key with a schema.
type: bool
default: "true"
validations: []
- name: sdk.schema.extract.payload.enabled
description: Whether to extract and decode the record payload with a schema.
type: bool
default: "true"
validations: []
- name: table
description: Table is used as the target table into which records are inserted.
type: string
default: '{{ index .Metadata "opencdc.collection" }}'
validations: []
Loading