Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sorter: fix unified sorter magic numbers & reduce memory consumption by channels (#1915) #1958

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4ebff6a
This is an automated cherry-pick of #1915
liuzix Jun 6, 2021
8c25598
resolve conflict
liuzix Jun 6, 2021
9b14b0b
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
liuzix Jun 21, 2021
0f024cb
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
b5d619f
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
4388ad7
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
600fba1
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
6c59350
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
ece81fe
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
2cb53fd
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
2d15e0a
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
5e2641c
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
61b8f83
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
66c9ceb
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
45f15d5
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
0132fc1
fix unit test
amyangfei Jun 22, 2021
85c4df4
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
amyangfei Jun 22, 2021
6bcf457
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
068c8be
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
e7df19f
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
58cf546
Merge branch 'release-5.0' into cherry-pick-1915-to-release-5.0
ti-chi-bot Jun 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/puller/sorter/file_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
fileBufferSize = 1 * 1024 * 1024 // 1MB
fileBufferSize = 32 * 1024 // 32KB
fileMagic = 0x12345678
numFileEntriesOffset = 4
blockMagic = 0xbeefbeef
Expand Down
3 changes: 2 additions & 1 deletion cdc/puller/sorter/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
const (
flushRateLimitPerSecond = 10
sortHeapCapacity = 32
sortHeapInputChSize = 1024
)

type flushTask struct {
Expand Down Expand Up @@ -85,7 +86,7 @@ type heapSorter struct {
func newHeapSorter(id int, out chan *flushTask) *heapSorter {
return &heapSorter{
id: id,
inputCh: make(chan *model.PolymorphicEvent, 1024*1024),
inputCh: make(chan *model.PolymorphicEvent, sortHeapInputChSize),
outputCh: out,
heap: make(sortHeap, 0, sortHeapCapacity),
canceller: new(asyncCanceller),
Expand Down
20 changes: 15 additions & 5 deletions cdc/puller/sorter/unified_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ import (
"golang.org/x/sync/errgroup"
)

const (
inputChSize = 128
outputChSize = 128
heapCollectChSize = 128 // this should be not be too small, to guarantee IO concurrency
// maxOpenHeapNum is the maximum number of allowed pending chunks in memory OR on-disk.
// This constant is a worst case upper limit, and setting a large number DOES NOT imply actually
// allocating these resources. This constant is PER TABLE.
// TODO refactor this out
maxOpenHeapNum = 1280000
)

// UnifiedSorter provides both sorting in memory and in file. Memory pressure is used to determine which one to use.
type UnifiedSorter struct {
inputCh chan *model.PolymorphicEvent
Expand Down Expand Up @@ -102,8 +113,8 @@ func NewUnifiedSorter(

lazyInitWorkerPool()
return &UnifiedSorter{
inputCh: make(chan *model.PolymorphicEvent, 128),
outputCh: make(chan *model.PolymorphicEvent, 128),
inputCh: make(chan *model.PolymorphicEvent, inputChSize),
outputCh: make(chan *model.PolymorphicEvent, outputChSize),
dir: dir,
pool: pool,
metricsInfo: &metricsInfo{
Expand Down Expand Up @@ -145,7 +156,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {
numConcurrentHeaps := sorterConfig.NumConcurrentWorker

errg, subctx := errgroup.WithContext(ctx)
heapSorterCollectCh := make(chan *flushTask, 4096)
heapSorterCollectCh := make(chan *flushTask, heapCollectChSize)
// mergerCleanUp will consumer the remaining elements in heapSorterCollectCh to prevent any FD leak.
defer mergerCleanUp(heapSorterCollectCh)

Expand Down Expand Up @@ -206,8 +217,7 @@ func (s *UnifiedSorter) Run(ctx context.Context) error {

nextSorterID := 0
for {
// tentative value 1280000
for atomic.LoadInt64(&mergerBufLen) > 1280000 {
for atomic.LoadInt64(&mergerBufLen) > maxOpenHeapNum {
after := time.After(1 * time.Second)
select {
case <-subctx.Done():
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ var defaultServerConfig = &ServerConfig{
ProcessorFlushInterval: TomlDuration(100 * time.Millisecond),
Sorter: &SorterConfig{
NumConcurrentWorker: 4,
ChunkSizeLimit: 1024 * 1024 * 1024, // 1GB
MaxMemoryPressure: 80,
MaxMemoryConsumption: 8 * 1024 * 1024 * 1024, // 8GB
ChunkSizeLimit: 128 * 1024 * 1024, // 128MB
MaxMemoryPressure: 30, // 30% is safe on machines with memory capacity <= 16GB
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16GB
NumWorkerPoolGoroutine: 16,
SortDir: "/tmp/cdc_sort",
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) {
b, err := conf.Marshal()
c.Assert(err, check.IsNil)

c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)
c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)
conf2 := new(ServerConfig)
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`))
err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`))
c.Assert(err, check.IsNil)
c.Assert(conf2, check.DeepEquals, conf)
}
Expand Down