Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement rate limiters #6

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ linters-settings: # please keep this alphabetized
arguments:
- 3
- name: empty-block
- name: confusing-naming
- name: superfluous-else
- name: unused-parameter
- name: unreachable-code
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ tiny-url 短链接服务
* 用户分享效果追踪;
* 减少字符占用。

# Features

## 开发进度
- [x] 发号器:基于 TDDL 生成唯一的短链接 ID;
- [x] 分布式缓存:支持 Redis 缓存;
- [x] 本地缓存:支持 bigcache 本地缓存;
- [x] 数据库:支持 MySQL 数据库;
- [ ] URL 302 重定向;
- [ ] url 编码:支持 Base58 编码;
- [ ] 限流器:支持漏桶限流器;
- [ ] 过期时间:支持短链接过期时间;

# 短链接服务系统设计

## 功能需求
Expand Down
27 changes: 27 additions & 0 deletions docs/rate-limiter-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# RateLimiter 设计文档

## 概述

`RateLimiter` 是一个接口,它知道如何限制处理某事的速率。它提供了一些方法来决定一个执行对象应该等待多长时间,停止跟踪一个执行对象,以及获取一个执行对象失败的次数。

## 主要组件

### RateLimiter 接口

`RateLimiter` 接口定义了限制处理速率的主要行为。它有三个方法:

- `When(item T) time.Duration`:获取一个执行对象应该等待多长时间。
- `Forget(item T)`:表示一个执行对象完成了重试。无论是因为失败还是成功,我们都会停止跟踪它。
- `Retries(item T) int`:返回执行对象失败的次数。

这个接口是泛型的,可以接受任何可比较的类型 `T`。

## 实现

`RateLimiter` 接口有多种实现,包括 `BucketRateLimiter`、`ItemExponentialFailureRateLimiter`、`ItemFastSlowRateLimiter` 和 `MaxOfRateLimiter`。这些实现提供了不同的限制策略,包括令牌桶限制、指数退避限制、快慢速率限制和最大速率限制。

每种实现都有自己的特性和使用场景。例如,`BucketRateLimiter` 使用标准的令牌桶进行限制,`ItemExponentialFailureRateLimiter` 使用基于指数退避的限制策略,`ItemFastSlowRateLimiter` 在一定次数的尝试后从快速重试切换到慢速重试,`MaxOfRateLimiter` 则从多个限制器中选择最严格的限制。

## 使用

要使用 `RateLimiter`,首先需要创建一个新的限制器实例,然后可以调用其 `When` 方法获取执行对象应该等待的时间。当执行对象完成重试时,应调用其 `Forget` 方法停止跟踪。可以通过 `Retries` 方法获取执行对象失败的次数。
7 changes: 7 additions & 0 deletions docs/tddl-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ worker 是一个在后台运行的 goroutine,负责生成序列号并将它们

renew 方法负责更新 `curr` 和 `max`。它首先从数据库中获取当前的序列记录,然后使用乐观锁更新该记录的序列号。如果更新成功,它会更新 `curr` 和 `max` 的值。

renew 方法的主要步骤包括:

1. 从数据库中获取当前的序列记录;
2. 使用乐观锁更新序列记录;
3. 如果前两步执行失败,采用指数退避策略重试,第一次重试间隔为 10 毫秒,之后每次重试间隔翻倍,直到达到最大重试间隔 1 min;
4. 如果更新 DB 序列记录成功,更新 `curr` 和 `max` 的值。

## 测试

TDDL 的测试主要包括单客户端和多客户端的序列号生成测试,以及超时处理测试。这些测试确保 TDDL 能在各种情况下正确地生成序列号。
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/swaggo/swag v1.16.3
github.com/urfave/cli/v2 v2.27.2
go.uber.org/mock v0.4.0
golang.org/x/time v0.5.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gorm.io/driver/mysql v1.5.7
gorm.io/gorm v1.25.10
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180824175216-6c1c5e93cdc1/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
Expand Down
56 changes: 36 additions & 20 deletions pkg/tddl/tddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ import (
"github.com/google/uuid"
"gorm.io/gorm"
"gorm.io/plugin/optimisticlock"

"github.com/beiai0xff/turl/pkg/workqueue"
)

// updateTDDLFailedSleepTime is the sleep time when update tddl failed
const updateTDDLFailedSleepTime = 10 * time.Millisecond
const (
// retryInterval is the retry start interval when update tddl failed
retryInterval = 10 * time.Millisecond
)

var (
// ErrStepTooSmall is the error of step too small
Expand Down Expand Up @@ -56,6 +60,8 @@ type tddlSequence struct {
stop chan struct{}
// TODO: use a buffer channel to avoid blocking
queue chan uint64

rateLimiter workqueue.RateLimiter[any]
}

// New returns a new tddl implementation
Expand All @@ -70,12 +76,13 @@ func newSequence(conn *gorm.DB, c *Config) (*tddlSequence, error) {
}

s := tddlSequence{
clientID: uuid.NewString(),
conn: conn,
step: c.Step,
wg: sync.WaitGroup{},
stop: make(chan struct{}),
queue: make(chan uint64),
clientID: uuid.NewString(),
conn: conn,
step: c.Step,
wg: sync.WaitGroup{},
stop: make(chan struct{}),
queue: make(chan uint64),
rateLimiter: workqueue.NewItemExponentialFailureRateLimiter[any](retryInterval, time.Minute),
}

if err := s.getRowID(c.SeqName, c.StartNum); err != nil {
Expand Down Expand Up @@ -137,27 +144,35 @@ func (s *tddlSequence) createRecord(seqName string, startNum uint64) (uint, erro
// s.wg.Wait()
// }

// renew function renews the sequence number
// should be called in a single goroutine
func (s *tddlSequence) renew() {
defer s.rateLimiter.Forget(s.clientID) // forget the retry times

var seq = Sequence{}

// TODO: retry in Exponential backoff
for {
seq = Sequence{}
res := s.conn.Where("id = ?", s.rowID).Take(&seq)
select {
case <-s.stop: // receive stop signal
return
default:
}

if res.Error != nil { // update the sequence with cas
slog.Warn("get sequence failed", slog.String("error", res.Error.Error()))
time.Sleep(updateTDDLFailedSleepTime)
seq = Sequence{}
res := s.conn.Where("id = ?", s.rowID).Take(&seq) // update the sequence with cas

continue
}
if res.Error == nil {
res = s.conn.Model(&seq).Update("sequence", seq.Sequence+s.step)
if res.Error == nil && res.RowsAffected == 1 {
break
}

res = s.conn.Model(&seq).Update("sequence", seq.Sequence+s.step)
if res.Error == nil && res.RowsAffected == 1 {
break
slog.Debug("cas sequence failed")
} else {
slog.Warn("get sequence failed", slog.String("error", res.Error.Error()))
}

slog.Debug("update sequence failed", slog.Uint64("rowsAffected", uint64(res.RowsAffected)))
time.Sleep(s.rateLimiter.When(s.clientID))
}

s.curr.Store(seq.Sequence - s.step)
Expand All @@ -167,6 +182,7 @@ func (s *tddlSequence) renew() {
slog.String("name", seq.Name),
slog.Uint64("maxSequence", seq.Sequence),
slog.Uint64("currSequence", s.curr.Load()),
slog.Int("retryTimes", s.rateLimiter.Retries(s.clientID)),
))
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/tddl/tddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,29 @@ func Test_tddlSequence_Next_timeout(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 10001, int(next))
}

func Test_tddlSequence_renew_failed(t *testing.T) {
gormDB := newMockDB(t)
t.Cleanup(func() {
gormDB.Exec("DELETE FROM sequences")
})

s, err := newSequence(gormDB, &Config{
Step: 1,
SeqName: testSeqName,
StartNum: 10000,
})
require.NoError(t, err)

sqlDB, _ := gormDB.DB()
sqlDB.Close()

go func() {
time.Sleep(time.Second)
// should retry 7 times
require.Equal(t, 7, s.rateLimiter.Retries(s.clientID))
s.Close()
}()

s.renew()
}
Loading
Loading