From c7af42f04d1fcec6c038a8c27541f38f7bf8e1db Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 13 Dec 2022 16:38:38 +0800 Subject: [PATCH 1/4] ddl: close lightning writers when import completed --- br/pkg/lightning/backend/local/local.go | 14 ++++++- br/pkg/lightning/manual/allocator.go | 38 +++++++++++++++++-- ddl/ingest/engine.go | 24 +++++++++++- ddl/ingest/message.go | 1 + .../addindextest/integration_test.go | 4 ++ 5 files changed, 76 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 317124d0b8d19..e32606207082e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) { return pebble.Open(dbPath, opts) } +var ( + // RunInTest indicates whether the current process is running in test. + RunInTest bool + // LastAlloc is the last ID allocator. + LastAlloc manual.Allocator +) + // NewLocalBackend creates new connections to tikv. func NewLocalBackend( ctx context.Context, @@ -461,6 +468,11 @@ func NewLocalBackend( } else { writeLimiter = noopStoreWriteLimiter{} } + alloc := manual.Allocator{} + if RunInTest { + alloc.RefCnt = new(atomic.Int64) + LastAlloc = alloc + } local := &local{ engines: sync.Map{}, pdCtl: pdCtl, @@ -486,7 +498,7 @@ func NewLocalBackend( keyAdapter: keyAdapter, errorMgr: errorMgr, importClientFactory: importClientFactory, - bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})), + bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)), writeLimiter: writeLimiter, logger: log.FromContext(ctx), encBuilder: NewEncodingBuilder(ctx), diff --git a/br/pkg/lightning/manual/allocator.go b/br/pkg/lightning/manual/allocator.go index 821eb750c5030..81bbe27278567 100644 --- a/br/pkg/lightning/manual/allocator.go +++ b/br/pkg/lightning/manual/allocator.go @@ -14,8 +14,40 @@ package manual -type Allocator struct{} +import ( + "fmt" -func (Allocator) Alloc(n int) []byte { return New(n) } + "go.uber.org/atomic" +) -func (Allocator) Free(b []byte) { Free(b) } +type Allocator struct { + RefCnt *atomic.Int64 +} + +func NewAllocator(runInTest bool) Allocator { + if runInTest { + return Allocator{RefCnt: new(atomic.Int64)} + } + return Allocator{} +} + +func (a Allocator) Alloc(n int) []byte { + if a.RefCnt != nil { + a.RefCnt.Add(1) + } + return New(n) +} + +func (a Allocator) Free(b []byte) { + if a.RefCnt != nil { + a.RefCnt.Add(-1) + } + Free(b) +} + +func (a Allocator) CheckRefCnt() error { + if a.RefCnt != nil && a.RefCnt.Load() != 0 { + return fmt.Errorf("memory leak detected, refCnt: %d", a.RefCnt.Load()) + } + return nil +} diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index 8392674c1eae6..5fe85e7263268 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -82,6 +82,11 @@ func (ei *engineInfo) Clean() { zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) } ei.openedEngine = nil + err = ei.closeWriters() + if err != nil { + logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + } // Here the local intermediate files will be removed. err = closedEngine.Cleanup(ei.ctx) if err != nil { @@ -101,8 +106,14 @@ func (ei *engineInfo) ImportAndClean() error { return err1 } ei.openedEngine = nil + err := ei.closeWriters() + if err != nil { + logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err + } - err := ei.diskRoot.UpdateUsageAndQuota() + err = ei.diskRoot.UpdateUsageAndQuota() if err != nil { logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Error(err), zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) @@ -181,6 +192,17 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) { }, nil } +func (ei *engineInfo) closeWriters() error { + var err error + for wid := range ei.writerCache.Keys() { + if w, ok := ei.writerCache.Load(wid); ok { + _, err = w.Close(ei.ctx) + } + ei.writerCache.Delete(wid) + } + return err +} + // WriteRow Write one row into local writer buffer. func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error { kvs := make([]common.KvPair, 1) diff --git a/ddl/ingest/message.go b/ddl/ingest/message.go index 0828d68796ba4..4996aab49a415 100644 --- a/ddl/ingest/message.go +++ b/ddl/ingest/message.go @@ -54,6 +54,7 @@ const ( LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for ingest" LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for ingest" LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage" + LitErrCloseWriterErr string = "[ddl-ingest] close writer error" ) func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error { diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 5567113696810..352dc83a1d1a2 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/ddl/testutil" @@ -44,6 +45,8 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) { tk.MustExec("use addindexlit;") tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + local.RunInTest = true + tk.MustExec("create table t (a int, b int, c int);") var sb strings.Builder sb.WriteString("insert into t values ") @@ -61,6 +64,7 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) { tk.MustExec("alter table t add unique index idx1(b);") tk.MustExec("admin check table t;") require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage()) + require.NoError(t, local.LastAlloc.CheckRefCnt()) } func TestAddIndexIngestLimitOneBackend(t *testing.T) { From 43e9d1416720bae7e758e434a58c105783787eb2 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 13 Dec 2022 16:56:57 +0800 Subject: [PATCH 2/4] remove unused function --- br/pkg/lightning/manual/allocator.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/br/pkg/lightning/manual/allocator.go b/br/pkg/lightning/manual/allocator.go index 81bbe27278567..18aa8cc9353c4 100644 --- a/br/pkg/lightning/manual/allocator.go +++ b/br/pkg/lightning/manual/allocator.go @@ -24,13 +24,6 @@ type Allocator struct { RefCnt *atomic.Int64 } -func NewAllocator(runInTest bool) Allocator { - if runInTest { - return Allocator{RefCnt: new(atomic.Int64)} - } - return Allocator{} -} - func (a Allocator) Alloc(n int) []byte { if a.RefCnt != nil { a.RefCnt.Add(1) From 5050ebfb408bb9dcd068360f659ec19e215661cc Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 13 Dec 2022 16:57:59 +0800 Subject: [PATCH 3/4] update bazel --- br/pkg/lightning/manual/BUILD.bazel | 1 + tests/realtikvtest/addindextest/BUILD.bazel | 1 + 2 files changed, 2 insertions(+) diff --git a/br/pkg/lightning/manual/BUILD.bazel b/br/pkg/lightning/manual/BUILD.bazel index 6d1fc18dd2495..d54902a23c066 100644 --- a/br/pkg/lightning/manual/BUILD.bazel +++ b/br/pkg/lightning/manual/BUILD.bazel @@ -10,4 +10,5 @@ go_library( cgo = True, importpath = "github.com/pingcap/tidb/br/pkg/lightning/manual", visibility = ["//visibility:public"], + deps = ["@org_uber_go_atomic//:atomic"], ) diff --git a/tests/realtikvtest/addindextest/BUILD.bazel b/tests/realtikvtest/addindextest/BUILD.bazel index 1ca10f9db34f2..a2e9c9906380b 100644 --- a/tests/realtikvtest/addindextest/BUILD.bazel +++ b/tests/realtikvtest/addindextest/BUILD.bazel @@ -33,6 +33,7 @@ go_test( ], embed = [":addindextest"], deps = [ + "//br/pkg/lightning/backend/local", "//config", "//ddl", "//ddl/ingest", From 8aa0c7f212391b3b566c451b9790678883f8139f Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 13 Dec 2022 17:10:08 +0800 Subject: [PATCH 4/4] return the first error --- ddl/ingest/engine.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index 5fe85e7263268..c7ed29a71d017 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -193,14 +193,19 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) { } func (ei *engineInfo) closeWriters() error { - var err error + var firstErr error for wid := range ei.writerCache.Keys() { if w, ok := ei.writerCache.Load(wid); ok { - _, err = w.Close(ei.ctx) + _, err := w.Close(ei.ctx) + if err != nil { + if firstErr == nil { + firstErr = err + } + } } ei.writerCache.Delete(wid) } - return err + return firstErr } // WriteRow Write one row into local writer buffer.