From 91fc1dd2d2d0432d5dfcdbd589ccdad74c96b902 Mon Sep 17 00:00:00 2001 From: Yen-Ming Lee Date: Fri, 15 Mar 2024 11:59:39 -0700 Subject: [PATCH] add LockPostgreSQL and BenchmarkDistLockers --- .github/workflows/tests.yml | 7 +++++++ Makefile | 2 +- docker-compose.yml | 7 +++++++ go.mod | 1 + go.sum | 2 ++ limiters_test.go | 30 +++++++++++++++++++++++++----- locks.go | 33 +++++++++++++++++++++++++++++++++ locks_test.go | 17 +++++++++++++++++ 8 files changed, 93 insertions(+), 6 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3a694d5..4f959eb 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -37,6 +37,12 @@ jobs: ALLOW_ANONYMOUS_LOGIN: yes ports: - 2181:2181 + postgresql: + image: bitnami/postgresql + env: + ALLOW_EMPTY_PASSWORD: yes + ports: + - 5432:5432 steps: - uses: actions/checkout@v4 @@ -56,6 +62,7 @@ jobs: ZOOKEEPER_ENDPOINTS: 'localhost:2181' AWS_ADDR: 'localhost:8000' MEMCACHED_ADDR: '127.0.0.1:11211' + POSTGRES_URL: postgres://postgres@localhost:5432/?sslmode=disable run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./... - uses: codecov/codecov-action@v4.1.0 with: diff --git a/Makefile b/Makefile index 7d96202..6a623f5 100644 --- a/Makefile +++ b/Makefile @@ -2,4 +2,4 @@ all: test test: docker-compose up -d - ETCD_ENDPOINTS="127.0.0.1:2379" REDIS_ADDR="127.0.0.1:6379" ZOOKEEPER_ENDPOINTS="127.0.0.1" CONSUL_ADDR="127.0.0.1:8500" AWS_ADDR="127.0.0.1:8000" MEMCACHED_ADDR="127.0.0.1:11211" go test -race -v -failfast + ETCD_ENDPOINTS="127.0.0.1:2379" REDIS_ADDR="127.0.0.1:6379" ZOOKEEPER_ENDPOINTS="127.0.0.1" CONSUL_ADDR="127.0.0.1:8500" AWS_ADDR="127.0.0.1:8000" MEMCACHED_ADDR="127.0.0.1:11211" POSTGRES_URL="postgres://postgres@localhost:5432/?sslmode=disable" go test -race -v -failfast -bench=. diff --git a/docker-compose.yml b/docker-compose.yml index 9e6083d..d548d07 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,3 +37,10 @@ services: command: "-jar DynamoDBLocal.jar -inMemory" ports: - "8000:8000" + + postgresql: + image: bitnami/postgresql + environment: + ALLOW_EMPTY_PASSWORD: yes + ports: + - "5432:5432" diff --git a/go.mod b/go.mod index 9d49742..dbabde7 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/aws/aws-sdk-go-v2 v1.17.6 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.1 github.com/cenkalti/backoff/v3 v3.2.2 + github.com/lib/pq v1.10.9 ) require ( diff --git a/go.sum b/go.sum index 1969ede..e84a64e 100644 --- a/go.sum +++ b/go.sum @@ -967,6 +967,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o= diff --git a/limiters_test.go b/limiters_test.go index 8c55074..5df421b 100644 --- a/limiters_test.go +++ b/limiters_test.go @@ -2,7 +2,9 @@ package limiters_test import ( "context" + "database/sql" "fmt" + "hash/fnv" "os" "strings" "sync" @@ -71,6 +73,7 @@ type LimitersTestSuite struct { dynamodbClient *dynamodb.Client dynamoDBTableProps l.DynamoDBTableProperties memcacheClient *memcache.Client + pgDb *sql.DB } func (s *LimitersTestSuite) SetupSuite() { @@ -118,6 +121,10 @@ func (s *LimitersTestSuite) SetupSuite() { s.memcacheClient = memcache.New(strings.Split(os.Getenv("MEMCACHED_ADDR"), ",")...) s.Require().NoError(s.memcacheClient.Ping()) + + s.pgDb, err = sql.Open("postgres", os.Getenv("POSTGRES_URL")) + s.Require().NoError(err) + s.Require().NoError(s.pgDb.Ping()) } func (s *LimitersTestSuite) TearDownSuite() { @@ -125,6 +132,7 @@ func (s *LimitersTestSuite) TearDownSuite() { s.Assert().NoError(s.redisClient.Close()) s.Assert().NoError(DeleteTestDynamoDBTable(context.Background(), s.dynamodbClient)) s.Assert().NoError(s.memcacheClient.Close()) + s.Assert().NoError(s.pgDb.Close()) } func TestLimitersTestSuite(t *testing.T) { @@ -138,6 +146,15 @@ func (s *LimitersTestSuite) lockers(generateKeys bool) map[string]l.DistLocker { return lockers } +func hash(s string) uint32 { + h := fnv.New32a() + _, err := h.Write([]byte(s)) + if err != nil { + panic(err) + } + return h.Sum32() +} + // distLockers returns distributed lockers only. func (s *LimitersTestSuite) distLockers(generateKeys bool) map[string]l.DistLocker { randomKey := uuid.New().String() @@ -146,21 +163,24 @@ func (s *LimitersTestSuite) distLockers(generateKeys bool) map[string]l.DistLock zkKey := "/" + randomKey redisKey := randomKey memcacheKey := randomKey + pgKey := randomKey if !generateKeys { consulKey = "dist_locker" etcdKey = "dist_locker" zkKey = "/dist_locker" redisKey = "dist_locker" memcacheKey = "dist_locker" + pgKey = "dist_locker" } consulLock, err := s.consulClient.LockKey(consulKey) s.Require().NoError(err) return map[string]l.DistLocker{ - "LockEtcd": l.NewLockEtcd(s.etcdClient, etcdKey, s.logger), - "LockConsul": l.NewLockConsul(consulLock), - "LockZookeeper": l.NewLockZookeeper(zk.NewLock(s.zkConn, zkKey, zk.WorldACL(zk.PermAll))), - "LockRedis": l.NewLockRedis(goredis.NewPool(s.redisClient), redisKey), - "LockMemcached": l.NewLockMemcached(s.memcacheClient, memcacheKey), + "LockEtcd": l.NewLockEtcd(s.etcdClient, etcdKey, s.logger), + "LockConsul": l.NewLockConsul(consulLock), + "LockZookeeper": l.NewLockZookeeper(zk.NewLock(s.zkConn, zkKey, zk.WorldACL(zk.PermAll))), + "LockRedis": l.NewLockRedis(goredis.NewPool(s.redisClient), redisKey), + "LockMemcached": l.NewLockMemcached(s.memcacheClient, memcacheKey), + "LockPostgreSQL": l.NewLockPostgreSQL(s.pgDb, hash(pgKey)), } } diff --git a/locks.go b/locks.go index 4cb4861..a5efde7 100644 --- a/locks.go +++ b/locks.go @@ -2,6 +2,7 @@ package limiters import ( "context" + "database/sql" "github.com/alessandro-c/gomemcached-lock" "github.com/alessandro-c/gomemcached-lock/adapters/gomemcache" "github.com/bradfitz/gomemcache/memcache" @@ -9,6 +10,7 @@ import ( "github.com/go-redsync/redsync/v4" redsyncredis "github.com/go-redsync/redsync/v4/redis" "github.com/hashicorp/consul/api" + _ "github.com/lib/pq" "github.com/pkg/errors" "github.com/samuel/go-zookeeper/zk" clientv3 "go.etcd.io/etcd/client/v3" @@ -187,3 +189,34 @@ func (l *LockMemcached) Lock(ctx context.Context) error { func (l *LockMemcached) Unlock(ctx context.Context) error { return l.locker.Release() } + +// LockPostgreSQL is an implementation of the DistLocker interface using PostgreSQL's advisory lock. +type LockPostgreSQL struct { + db *sql.DB + id uint32 + tx *sql.Tx +} + +// NewLockPostgreSQL creates a new LockPostgreSQL. +func NewLockPostgreSQL(db *sql.DB, id uint32) *LockPostgreSQL { + return &LockPostgreSQL{db, id, nil} +} + +// Make sure LockPostgreSQL implements DistLocker interface +var _ DistLocker = (*LockPostgreSQL)(nil) + +// Lock acquire an advisory lock in PostgreSQL +func (l *LockPostgreSQL) Lock(ctx context.Context) error { + var err error + l.tx, err = l.db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return err + } + _, err = l.tx.ExecContext(ctx, "SELECT pg_advisory_xact_lock($1)", l.id) + return err +} + +// Unlock releases an advisory lock in PostgreSQL +func (l *LockPostgreSQL) Unlock(ctx context.Context) error { + return l.tx.Rollback() +} diff --git a/locks_test.go b/locks_test.go index 150ecdd..ae6eac9 100644 --- a/locks_test.go +++ b/locks_test.go @@ -3,6 +3,7 @@ package limiters_test import ( "context" "sync" + "testing" "time" "github.com/mennanov/limiters" @@ -44,3 +45,19 @@ func (s *LimitersTestSuite) TestDistLockers() { }) } } + +func BenchmarkDistLockers(b *testing.B) { + s := new(LimitersTestSuite) + s.SetT(&testing.T{}) + s.SetupSuite() + lockers := s.distLockers(false) + for name, locker := range lockers { + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + s.Require().NoError(locker.Lock(context.Background())) + s.Require().NoError(locker.Unlock(context.Background())) + } + }) + } + s.TearDownSuite() +}