Skip to content

Commit 485f63c

Browse files
authored
streamline the database interfaces and add pgx adapter (#29)
1 parent 350c00c commit 485f63c

17 files changed

+450
-221
lines changed

.github/workflows/pr.yml

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
name: pr
22
on:
3+
pull_request:
34
push:
45
branches-ignore:
56
- master

docker-compose.yml

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# for Watermill development purposes.
22
# For Watermill based application docker please check https://watermill.io/docs/getting-started/
33

4-
version: '3'
54
services:
65
mysql:
76
image: mysql:8.0

go.mod

+9-11
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
module github.com/ThreeDotsLabs/watermill-sql/v4
22

3-
go 1.21
3+
go 1.22
44

5-
toolchain go1.23.0
5+
toolchain go1.23.3
66

77
require (
88
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2
99
github.com/go-sql-driver/mysql v1.4.1
10-
github.com/jackc/pgx/v4 v4.18.2
11-
github.com/lib/pq v1.10.2
10+
github.com/jackc/pgx/v5 v5.7.1
11+
github.com/lib/pq v1.10.9
1212
github.com/oklog/ulid v1.3.1
13+
github.com/pkg/errors v0.9.1
1314
github.com/stretchr/testify v1.9.0
1415
)
1516

@@ -19,18 +20,15 @@ require (
1920
github.com/google/uuid v1.6.0 // indirect
2021
github.com/hashicorp/errwrap v1.1.0 // indirect
2122
github.com/hashicorp/go-multierror v1.1.1 // indirect
22-
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
23-
github.com/jackc/pgconn v1.14.3 // indirect
24-
github.com/jackc/pgio v1.0.0 // indirect
2523
github.com/jackc/pgpassfile v1.0.0 // indirect
26-
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
27-
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
28-
github.com/jackc/pgtype v1.14.0 // indirect
24+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
25+
github.com/jackc/puddle/v2 v2.2.2 // indirect
2926
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
30-
github.com/pkg/errors v0.9.1 // indirect
3127
github.com/pmezard/go-difflib v1.0.0 // indirect
28+
github.com/rogpeppe/go-internal v1.13.1 // indirect
3229
github.com/sony/gobreaker v1.0.0 // indirect
3330
golang.org/x/crypto v0.31.0 // indirect
31+
golang.org/x/sync v0.10.0 // indirect
3432
golang.org/x/text v0.21.0 // indirect
3533
google.golang.org/appengine v1.6.7 // indirect
3634
gopkg.in/yaml.v3 v3.0.1 // indirect

go.sum

+16-156
Large diffs are not rendered by default.

pkg/sql/adapters_pgx.go

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package sql
2+
3+
import (
4+
"context"
5+
stdSQL "database/sql"
6+
"fmt"
7+
8+
"github.com/jackc/pgx/v5"
9+
"github.com/jackc/pgx/v5/pgconn"
10+
)
11+
12+
type Conn interface {
13+
BeginTx(ctx context.Context, options pgx.TxOptions) (pgx.Tx, error)
14+
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
15+
Query(ctx context.Context, sql string, arguments ...any) (pgx.Rows, error)
16+
}
17+
18+
type PgxBeginner struct {
19+
Conn
20+
}
21+
22+
func BeginnerFromPgx(conn Conn) Beginner {
23+
return PgxBeginner{conn}
24+
}
25+
26+
type PgxTx struct {
27+
pgx.Tx
28+
ctx context.Context
29+
}
30+
31+
type PgxResult struct {
32+
pgconn.CommandTag
33+
}
34+
35+
type PgxRows struct {
36+
pgx.Rows
37+
}
38+
39+
func (c PgxBeginner) BeginTx(ctx context.Context, options *stdSQL.TxOptions) (Tx, error) {
40+
opts := pgx.TxOptions{}
41+
if options != nil {
42+
iso, err := toPgxIsolationLevel(options.Isolation)
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
opts = pgx.TxOptions{
48+
IsoLevel: iso,
49+
AccessMode: toPgxAccessMode(options.ReadOnly),
50+
DeferrableMode: "",
51+
}
52+
}
53+
54+
tx, err := c.Conn.BeginTx(ctx, opts)
55+
56+
return &PgxTx{
57+
Tx: tx,
58+
ctx: ctx,
59+
}, err
60+
}
61+
62+
func (c PgxBeginner) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
63+
res, err := c.Conn.Exec(ctx, query, args...)
64+
65+
return PgxResult{res}, err
66+
}
67+
68+
func (c PgxBeginner) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) {
69+
rows, err := c.Conn.Query(ctx, query, args...)
70+
71+
return PgxRows{rows}, err
72+
}
73+
74+
func (t PgxTx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
75+
res, err := t.Tx.Exec(ctx, query, args...)
76+
77+
return PgxResult{res}, err
78+
}
79+
80+
func (t PgxTx) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) {
81+
rows, err := t.Tx.Query(ctx, query, args...)
82+
83+
return PgxRows{rows}, err
84+
}
85+
86+
func (t PgxTx) Rollback() error {
87+
return t.Tx.Rollback(context.WithoutCancel(t.ctx))
88+
}
89+
90+
func (t PgxTx) Commit() error {
91+
return t.Tx.Commit(t.ctx)
92+
}
93+
94+
func (p PgxResult) RowsAffected() (int64, error) {
95+
return p.CommandTag.RowsAffected(), nil
96+
}
97+
98+
func (p PgxRows) Close() error {
99+
p.Rows.Close()
100+
101+
return nil
102+
}
103+
104+
func toPgxIsolationLevel(level stdSQL.IsolationLevel) (pgx.TxIsoLevel, error) {
105+
switch level {
106+
case stdSQL.LevelReadUncommitted:
107+
return pgx.ReadUncommitted, nil
108+
case stdSQL.LevelReadCommitted:
109+
return pgx.ReadCommitted, nil
110+
case stdSQL.LevelRepeatableRead:
111+
return pgx.RepeatableRead, nil
112+
case stdSQL.LevelSerializable:
113+
return pgx.Serializable, nil
114+
case stdSQL.LevelSnapshot:
115+
return pgx.Serializable, fmt.Errorf("pgx does not support snapshot isolation")
116+
default:
117+
return pgx.Serializable, nil
118+
}
119+
}
120+
121+
func toPgxAccessMode(readOnly bool) pgx.TxAccessMode {
122+
if readOnly {
123+
return pgx.ReadOnly
124+
}
125+
126+
return pgx.ReadWrite
127+
}

pkg/sql/adapters_std_sql.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package sql
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
)
7+
8+
type StdSQLBeginner struct {
9+
SQLBeginner
10+
}
11+
12+
func BeginnerFromStdSQL(sqlBeginner SQLBeginner) Beginner {
13+
return StdSQLBeginner{sqlBeginner}
14+
}
15+
16+
type StdSQLTx struct {
17+
*sql.Tx
18+
}
19+
20+
func TxFromStdSQL(tx *sql.Tx) Tx {
21+
return &StdSQLTx{tx}
22+
}
23+
24+
// BeginTx converts the stdSQL.Tx struct to our Tx interface
25+
func (c StdSQLBeginner) BeginTx(ctx context.Context, options *sql.TxOptions) (Tx, error) {
26+
tx, err := c.SQLBeginner.BeginTx(ctx, options)
27+
28+
return &StdSQLTx{tx}, err
29+
}
30+
31+
// ExecContext converts the stdSQL.Result struct to our Result interface
32+
func (c StdSQLBeginner) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
33+
return c.SQLBeginner.ExecContext(ctx, query, args...)
34+
}
35+
36+
// QueryContext converts the stdSQL.Rows struct to our Rows interface
37+
func (c StdSQLBeginner) QueryContext(ctx context.Context, query string, args ...interface{}) (Rows, error) {
38+
return c.SQLBeginner.QueryContext(ctx, query, args...)
39+
}
40+
41+
// ExecContext converts the stdSQL.Result struct to our Result interface
42+
func (t StdSQLTx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
43+
return t.Tx.ExecContext(ctx, query, args...)
44+
}
45+
46+
// QueryContext converts the stdSQL.Rows struct to our Rows interface
47+
func (t StdSQLTx) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) {
48+
return t.Tx.QueryContext(ctx, query, args...)
49+
}

pkg/sql/context.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package sql
22

33
import (
44
"context"
5-
"database/sql"
65
)
76

87
type contextKey string
@@ -11,7 +10,7 @@ const (
1110
txContextKey contextKey = "tx"
1211
)
1312

14-
func setTxToContext(ctx context.Context, tx *sql.Tx) context.Context {
13+
func setTxToContext(ctx context.Context, tx Tx) context.Context {
1514
return context.WithValue(ctx, txContextKey, tx)
1615
}
1716

@@ -21,7 +20,7 @@ func setTxToContext(ctx context.Context, tx *sql.Tx) context.Context {
2120
//
2221
// It is useful when you want to ensure that data is updated only when the message is processed.
2322
// Example usage: https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/exactly-once-delivery-counter
24-
func TxFromContext(ctx context.Context) (*sql.Tx, bool) {
25-
tx, ok := ctx.Value(txContextKey).(*sql.Tx)
23+
func TxFromContext(ctx context.Context) (Tx, bool) {
24+
tx, ok := ctx.Value(txContextKey).(Tx)
2625
return tx, ok
2726
}

pkg/sql/delayed_postgresql.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package sql
22

33
import (
4-
"database/sql"
54
"fmt"
65
"strings"
76

@@ -28,7 +27,7 @@ func (c *DelayedPostgreSQLPublisherConfig) setDefaults() {
2827

2928
// NewDelayedPostgreSQLPublisher creates a new Publisher that stores messages in PostgreSQL with a delay.
3029
// The delay can be set per message with the Watermill's components/delay metadata.
31-
func NewDelayedPostgreSQLPublisher(db *sql.DB, config DelayedPostgreSQLPublisherConfig) (message.Publisher, error) {
30+
func NewDelayedPostgreSQLPublisher(db ContextExecutor, config DelayedPostgreSQLPublisherConfig) (message.Publisher, error) {
3231
config.setDefaults()
3332

3433
publisherConfig := PublisherConfig{
@@ -82,7 +81,7 @@ func (c *DelayedPostgreSQLSubscriberConfig) setDefaults() {
8281

8382
// NewDelayedPostgreSQLSubscriber creates a new Subscriber that reads messages from PostgreSQL with a delay.
8483
// The delay can be set per message with the Watermill's components/delay metadata.
85-
func NewDelayedPostgreSQLSubscriber(db *sql.DB, config DelayedPostgreSQLSubscriberConfig) (message.Subscriber, error) {
84+
func NewDelayedPostgreSQLSubscriber(db Beginner, config DelayedPostgreSQLSubscriberConfig) (message.Subscriber, error) {
8685
config.setDefaults()
8786

8887
where := fmt.Sprintf("(metadata->>'%v')::timestamptz < NOW() AT TIME ZONE 'UTC'", delay.DelayedUntilKey)

pkg/sql/delayed_requeuer.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package sql
22

33
import (
44
"context"
5-
"database/sql"
65
"fmt"
76
"time"
87

@@ -35,7 +34,7 @@ func (q DelayedRequeuer) Run(ctx context.Context) error {
3534
// DelayedRequeuerConfig is a configuration for DelayedRequeuer.
3635
type DelayedRequeuerConfig struct {
3736
// DB is a database connection. Required.
38-
DB *sql.DB
37+
DB Beginner
3938

4039
// Publisher is a publisher that will be used to publish requeued messages. Required.
4140
Publisher message.Publisher

pkg/sql/publisher.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"sync"
8+
"time"
89

910
"github.com/ThreeDotsLabs/watermill"
1011
"github.com/ThreeDotsLabs/watermill/message"
@@ -133,8 +134,11 @@ func (p *Publisher) initializeSchema(topic string) error {
133134
return nil
134135
}
135136

137+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
138+
defer cancel()
139+
136140
if err := initializeSchema(
137-
context.Background(),
141+
ctx,
138142
topic,
139143
p.logger,
140144
p.db,

0 commit comments

Comments
 (0)