diff --git a/go.mod b/go.mod index fda6cc0..ae967af 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/jackc/pgx/v4 v4.18.2 github.com/lib/pq v1.10.2 github.com/oklog/ulid v1.3.1 - github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.1 ) @@ -25,6 +24,7 @@ require ( github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.14.0 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect golang.org/x/crypto v0.20.0 // indirect diff --git a/pkg/sql/publisher.go b/pkg/sql/publisher.go index 92626e2..3da9a8f 100644 --- a/pkg/sql/publisher.go +++ b/pkg/sql/publisher.go @@ -2,10 +2,10 @@ package sql import ( "context" + "errors" + "fmt" "sync" - "github.com/pkg/errors" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) @@ -53,7 +53,7 @@ type Publisher struct { func NewPublisher(db ContextExecutor, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) { config.setDefaults() if err := config.validate(); err != nil { - return nil, errors.Wrap(err, "invalid config") + return nil, fmt.Errorf("invalid config: %w", err) } if db == nil { @@ -105,7 +105,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) (err err insertQuery, err := p.config.SchemaAdapter.InsertQuery(topic, messages) if err != nil { - return errors.Wrap(err, "cannot create insert query") + return fmt.Errorf("cannot create insert query: %w", err) } p.logger.Trace("Inserting message to SQL", watermill.LogFields{ @@ -115,7 +115,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) (err err _, err = p.db.ExecContext(context.Background(), insertQuery.Query, insertQuery.Args...) if err != nil { - return errors.Wrap(err, "could not insert message as row") + return fmt.Errorf("could not insert message as row: %w", err) } return nil @@ -138,7 +138,7 @@ func (p *Publisher) initializeSchema(topic string) error { p.config.SchemaAdapter, nil, ); err != nil { - return errors.Wrap(err, "cannot initialize schema") + return fmt.Errorf("cannot initialize schema: %w", err) } p.initializedTopics.Store(topic, struct{}{}) diff --git a/pkg/sql/schema.go b/pkg/sql/schema.go index 661fa4c..2ca6d94 100644 --- a/pkg/sql/schema.go +++ b/pkg/sql/schema.go @@ -2,9 +2,9 @@ package sql import ( "context" + "fmt" "github.com/ThreeDotsLabs/watermill" - "github.com/pkg/errors" ) func initializeSchema( @@ -30,9 +30,9 @@ func initializeSchema( }) for _, q := range initializingQueries { - _, err := db.ExecContext(ctx, q.Query, q.Args...) + _, err = db.ExecContext(ctx, q.Query, q.Args...) if err != nil { - return errors.Wrap(err, "could not initialize schema") + return fmt.Errorf("could not initialize schema: %w", err) } } diff --git a/pkg/sql/schema_adapter.go b/pkg/sql/schema_adapter.go index f096bb3..ca155f2 100644 --- a/pkg/sql/schema_adapter.go +++ b/pkg/sql/schema_adapter.go @@ -3,9 +3,9 @@ package sql import ( "database/sql" "encoding/json" + "fmt" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) // SchemaAdapter produces the SQL queries and arguments appropriately for a specific schema and dialect @@ -49,7 +49,7 @@ func defaultInsertArgs(msgs message.Messages) ([]interface{}, error) { for _, msg := range msgs { metadata, err := json.Marshal(msg.Metadata) if err != nil { - return nil, errors.Wrapf(err, "could not marshal metadata into JSON for message %s", msg.UUID) + return nil, fmt.Errorf("could not marshal metadata into JSON for message %s: %w", msg.UUID, err) } args = append(args, msg.UUID, []byte(msg.Payload), metadata) diff --git a/pkg/sql/schema_adapter_mysql.go b/pkg/sql/schema_adapter_mysql.go index db6b9e7..d4a1570 100644 --- a/pkg/sql/schema_adapter_mysql.go +++ b/pkg/sql/schema_adapter_mysql.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) // DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL. @@ -102,7 +101,7 @@ func (s DefaultMySQLSchema) UnmarshalMessage(row Scanner) (Row, error) { r := Row{} err := row.Scan(&r.Offset, &r.UUID, &r.Payload, &r.Metadata) if err != nil { - return Row{}, errors.Wrap(err, "could not scan message row") + return Row{}, fmt.Errorf("could not scan message row: %w", err) } msg := message.NewMessage(string(r.UUID), r.Payload) @@ -110,7 +109,7 @@ func (s DefaultMySQLSchema) UnmarshalMessage(row Scanner) (Row, error) { if r.Metadata != nil { err = json.Unmarshal(r.Metadata, &msg.Metadata) if err != nil { - return Row{}, errors.Wrap(err, "could not unmarshal metadata as JSON") + return Row{}, fmt.Errorf("could not unmarshal metadata as JSON: %w", err) } } diff --git a/pkg/sql/schema_adapter_postgresql.go b/pkg/sql/schema_adapter_postgresql.go index 9b40bf0..d277b16 100644 --- a/pkg/sql/schema_adapter_postgresql.go +++ b/pkg/sql/schema_adapter_postgresql.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" ) // DefaultPostgreSQLSchema is a default implementation of SchemaAdapter based on PostgreSQL. @@ -112,7 +111,7 @@ func (s DefaultPostgreSQLSchema) UnmarshalMessage(row Scanner) (Row, error) { err := row.Scan(&r.Offset, &transactionID, &r.UUID, &r.Payload, &r.Metadata) if err != nil { - return Row{}, errors.Wrap(err, "could not scan message row") + return Row{}, fmt.Errorf("could not scan message row: %w", err) } msg := message.NewMessage(string(r.UUID), r.Payload) @@ -120,7 +119,7 @@ func (s DefaultPostgreSQLSchema) UnmarshalMessage(row Scanner) (Row, error) { if r.Metadata != nil { err = json.Unmarshal(r.Metadata, &msg.Metadata) if err != nil { - return Row{}, errors.Wrap(err, "could not unmarshal metadata as JSON") + return Row{}, fmt.Errorf("could not unmarshal metadata as JSON: %w", err) } } diff --git a/pkg/sql/subscriber.go b/pkg/sql/subscriber.go index 8c3e67c..06db966 100644 --- a/pkg/sql/subscriber.go +++ b/pkg/sql/subscriber.go @@ -3,12 +3,12 @@ package sql import ( "context" "database/sql" - stdErrors "errors" + "errors" + "fmt" "sync" "time" "github.com/oklog/ulid" - "github.com/pkg/errors" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" @@ -126,7 +126,7 @@ func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.Logger config.setDefaults() err := config.validate() if err != nil { - return nil, errors.Wrap(err, "invalid config") + return nil, fmt.Errorf("invalid config: %w", err) } if logger == nil { @@ -135,7 +135,7 @@ func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.Logger idBytes, idStr, err := newSubscriberID() if err != nil { - return &Subscriber{}, errors.Wrap(err, "cannot generate subscriber id") + return &Subscriber{}, fmt.Errorf("cannot generate subscriber id: %w", err) } logger = logger.With(watermill.LogFields{"subscriber_id": idStr}) @@ -159,7 +159,7 @@ func newSubscriberID() ([]byte, string, error) { id := watermill.NewULID() idBytes, err := ulid.MustParseStrict(id).MarshalBinary() if err != nil { - return nil, "", errors.Wrap(err, "cannot marshal subscriber id") + return nil, "", fmt.Errorf("cannot marshal subscriber id: %w", err) } return idBytes, id, nil @@ -191,7 +191,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *mes _, err := tx.ExecContext(ctx, q.Query, q.Args...) if err != nil { - return errors.Wrap(err, "cannot execute before subscribing query") + return fmt.Errorf("cannot execute before subscribing query: %w", err) } } return nil @@ -263,20 +263,20 @@ func (s *Subscriber) query( } tx, err := s.db.BeginTx(ctx, txOptions) if err != nil { - return false, errors.Wrap(err, "could not begin tx for querying") + return false, fmt.Errorf("could not begin tx for querying: %w", err) } defer func() { if err != nil { rollbackErr := tx.Rollback() - if rollbackErr != nil && rollbackErr != sql.ErrTxDone { + if rollbackErr != nil && !errors.Is(rollbackErr, sql.ErrTxDone) { logger.Error("could not rollback tx for querying message", rollbackErr, watermill.LogFields{ "query_err": err, }) } } else { commitErr := tx.Commit() - if commitErr != nil && commitErr != sql.ErrTxDone { + if commitErr != nil && !errors.Is(commitErr, sql.ErrTxDone) { logger.Error("could not commit tx for querying message", commitErr, nil) } } @@ -293,12 +293,12 @@ func (s *Subscriber) query( }) rows, err := tx.QueryContext(ctx, selectQuery.Query, selectQuery.Args...) if err != nil { - return false, errors.Wrap(err, "could not query message") + return false, fmt.Errorf("could not query message: %w", err) } defer func() { if rowsCloseErr := rows.Close(); rowsCloseErr != nil { - err = stdErrors.Join(err, errors.Wrap(err, "could not close rows")) + err = errors.Join(err, fmt.Errorf("could not close rows: %w", err)) } }() @@ -309,10 +309,10 @@ func (s *Subscriber) query( for rows.Next() { row, err := s.config.SchemaAdapter.UnmarshalMessage(rows) - if errors.Cause(err) == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return true, nil } else if err != nil { - return false, errors.Wrap(err, "could not unmarshal message from query") + return false, fmt.Errorf("could not unmarshal message from query: %w", err) } messageRows = append(messageRows, row) @@ -321,7 +321,7 @@ func (s *Subscriber) query( for _, row := range messageRows { acked, err := s.processMessage(ctx, topic, row, tx, out, logger) if err != nil { - return false, errors.Wrap(err, "could not process message") + return false, fmt.Errorf("could not process message: %w", err) } if !acked { break @@ -348,7 +348,7 @@ func (s *Subscriber) query( result, err := tx.ExecContext(ctx, ackQuery.Query, ackQuery.Args...) if err != nil { - return false, errors.Wrap(err, "could not get args for acking the message") + return false, fmt.Errorf("could not get args for acking the message: %w", err) } rowsAffected, _ := result.RowsAffected() @@ -388,7 +388,7 @@ func (s *Subscriber) processMessage( _, err := tx.ExecContext(ctx, consumedQuery.Query, consumedQuery.Args...) if err != nil { - return false, errors.Wrap(err, "cannot send consumed query") + return false, fmt.Errorf("cannot send consumed query: %w", err) } logger.Trace("Executed query to confirm message consumed", nil) diff --git a/pkg/sql/topic.go b/pkg/sql/topic.go index e35a72e..0deb437 100644 --- a/pkg/sql/topic.go +++ b/pkg/sql/topic.go @@ -1,9 +1,9 @@ package sql import ( + "errors" + "fmt" "regexp" - - "github.com/pkg/errors" ) var disallowedTopicCharacters = regexp.MustCompile(`[^A-Za-z0-9\-\$\:\.\_]`) @@ -14,7 +14,7 @@ var ErrInvalidTopicName = errors.New("topic name should not contain characters m // Topics are translated into SQL tables and patched into some queries, so this is done to prevent injection as well. func validateTopicName(topic string) error { if disallowedTopicCharacters.MatchString(topic) { - return errors.Wrap(ErrInvalidTopicName, topic) + return fmt.Errorf("%s: %w", topic, ErrInvalidTopicName) } return nil diff --git a/pkg/sql/topic_test.go b/pkg/sql/topic_test.go index a080275..22e50b2 100644 --- a/pkg/sql/topic_test.go +++ b/pkg/sql/topic_test.go @@ -5,12 +5,11 @@ import ( "testing" "time" - "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" - "github.com/ThreeDotsLabs/watermill/message" - - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" + "github.com/ThreeDotsLabs/watermill/message" ) func TestValidateTopicName(t *testing.T) { @@ -22,11 +21,11 @@ func TestValidateTopicName(t *testing.T) { err := publisher.Publish(cleverlyNamedTopic, message.NewMessage("uuid", nil)) require.Error(t, err) - assert.Equal(t, sql.ErrInvalidTopicName, errors.Cause(err)) + assert.ErrorIs(t, err, sql.ErrInvalidTopicName) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() _, err = subscriber.Subscribe(ctx, cleverlyNamedTopic) require.Error(t, err) - assert.Equal(t, sql.ErrInvalidTopicName, errors.Cause(err)) + assert.ErrorIs(t, err, sql.ErrInvalidTopicName) }