From d1454b118c27385ace0133f33737540e616ab2d9 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 22 Oct 2020 16:20:24 +0800 Subject: [PATCH] executor: tiny optimize slow_query performance and add related runtime stats (#20200) (#20420) Signed-off-by: ti-srebot Signed-off-by: crazycs520 --- executor/executor_pkg_test.go | 17 +++ executor/executor_test.go | 31 +++++ executor/memtable_reader.go | 11 ++ executor/slow_query.go | 232 +++++++++++++++++++++++++------- util/execdetails/execdetails.go | 2 + util/memory/tracker.go | 5 + 6 files changed, 246 insertions(+), 52 deletions(-) mode change 100644 => 100755 executor/slow_query.go diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 8b8ad172e3ede..3616d95961dbc 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -16,6 +16,7 @@ package executor import ( "context" "crypto/tls" + "time" . "github.com/pingcap/check" "github.com/pingcap/failpoint" @@ -391,3 +392,19 @@ func (s *testExecSerialSuite) TestSortSpillDisk(c *C) { err = exec.Close() c.Assert(err, IsNil) } + +func (s *pkgTestSuite) TestSlowQueryRuntimeStats(c *C) { + stats := &slowQueryRuntimeStats{ + totalFileNum: 2, + readFileNum: 2, + readFile: time.Second, + initialize: time.Millisecond, + readFileSize: 1024 * 1024 * 1024, + parseLog: int64(time.Millisecond * 100), + concurrent: 15, + } + c.Assert(stats.String(), Equals, "initialize: 1ms, read_file: 1s, parse_log: {time:100ms, concurrency:15}, total_file: 2, read_file: 2, read_size: 1024 MB") + c.Assert(stats.String(), Equals, stats.Clone().String()) + stats.Merge(stats.Clone()) + c.Assert(stats.String(), Equals, "initialize: 2ms, read_file: 2s, parse_log: {time:200ms, concurrency:15}, total_file: 4, read_file: 4, read_size: 2 GB") +} diff --git a/executor/executor_test.go b/executor/executor_test.go index d55ff0428014a..fbf96891cfd03 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -17,6 +17,7 @@ import ( "context" "flag" "fmt" + "io/ioutil" "math" "net" "os" @@ -5967,6 +5968,36 @@ func (s *testClusterTableSuite) TestSlowQueryWithoutSlowLog(c *C) { tk.MustQuery("select query from information_schema.slow_query where time > '2020-09-15 12:16:39' and time < now()").Check(testkit.Rows()) } +func (s *testClusterTableSuite) TestSlowQuery2(c *C) { + tk := testkit.NewTestKit(c, s.store) + + f, err := ioutil.TempFile("", "tidb-slow-*.log") + c.Assert(err, IsNil) + f.WriteString(` +# Time: 2020-10-13T20:08:13.970563+08:00 +select * from t; +# Time: 2020-10-16T20:08:13.970563+08:00 +select * from t; +`) + f.Close() + + executor.ParseSlowLogBatchSize = 1 + originCfg := config.GetGlobalConfig() + newCfg := *originCfg + newCfg.Log.SlowQueryFile = f.Name() + config.StoreGlobalConfig(&newCfg) + defer func() { + executor.ParseSlowLogBatchSize = 64 + config.StoreGlobalConfig(originCfg) + os.Remove(newCfg.Log.SlowQueryFile) + }() + err = logutil.InitLogger(newCfg.Log.ToLogConfig()) + c.Assert(err, IsNil) + + tk.MustQuery("select count(*) from `information_schema`.`slow_query` where time > '2020-10-16 20:08:13' and time < '2020-10-16 21:08:13'").Check(testkit.Rows("1")) + tk.MustQuery("select count(*) from `information_schema`.`slow_query` where time > '2019-10-13 20:08:13' and time < '2020-10-16 21:08:13'").Check(testkit.Rows("2")) +} + func (s *testSplitTable) TestKillTableReader(c *C) { var retry = "github.com/pingcap/tidb/store/tikv/mockRetrySendReqToRegion" defer func() { diff --git a/executor/memtable_reader.go b/executor/memtable_reader.go index 1f5a88e516982..bf17e3520dbb6 100644 --- a/executor/memtable_reader.go +++ b/executor/memtable_reader.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/pdapi" "github.com/pingcap/tidb/util/set" "go.uber.org/zap" @@ -53,9 +54,12 @@ type dummyCloser struct{} func (dummyCloser) close() error { return nil } +func (dummyCloser) getRuntimeStats() execdetails.RuntimeStats { return nil } + type memTableRetriever interface { retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) close() error + getRuntimeStats() execdetails.RuntimeStats } // MemTableReaderExec executes memTable information retrieving from the MemTable components @@ -127,6 +131,9 @@ func (e *MemTableReaderExec) Next(ctx context.Context, req *chunk.Chunk) error { // Close implements the Executor Close interface. func (e *MemTableReaderExec) Close() error { + if stats := e.retriever.getRuntimeStats(); stats != nil && e.runtimeStats != nil { + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, stats) + } return e.retriever.close() } @@ -661,3 +668,7 @@ func (e *clusterLogRetriever) close() error { } return nil } + +func (e *clusterLogRetriever) getRuntimeStats() execdetails.RuntimeStats { + return nil +} diff --git a/executor/slow_query.go b/executor/slow_query.go old mode 100644 new mode 100755 index d70bbd77c225e..bd2aae5eeff22 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -18,12 +18,14 @@ import ( "context" "fmt" "io" + "io/ioutil" "os" "path/filepath" "sort" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -41,10 +43,14 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/plancodec" "go.uber.org/zap" ) +// ParseSlowLogBatchSize is the batch size of slow-log lines for a worker to parse, exported for testing. +var ParseSlowLogBatchSize = 64 + //slowQueryRetriever is used to read slow log data. type slowQueryRetriever struct { table *model.TableInfo @@ -57,6 +63,7 @@ type slowQueryRetriever struct { checker *slowLogChecker parsedSlowLogCh chan parsedSlowLog + stats *slowQueryRuntimeStats } func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { @@ -98,6 +105,7 @@ func (e *slowQueryRetriever) initialize(sctx sessionctx.Context) error { hasProcessPriv: hasProcessPriv, user: sctx.GetSessionVars().User, } + e.stats = &slowQueryRuntimeStats{} if e.extractor != nil { e.checker.enableTimeCheck = e.extractor.Enable e.checker.startTime = types.NewTime(types.FromGoTime(e.extractor.StartTime), mysql.TypeDatetime, types.MaxFsp) @@ -123,14 +131,31 @@ type parsedSlowLog struct { err error } +func (e *slowQueryRetriever) getNextFile() *os.File { + if e.fileIdx >= len(e.files) { + return nil + } + file := e.files[e.fileIdx].file + e.fileIdx++ + if e.stats != nil { + stat, err := file.Stat() + if err == nil { + // ignore the err will be ok. + e.stats.readFileSize += stat.Size() + e.stats.readFileNum++ + } + } + return file +} + func (e *slowQueryRetriever) parseDataForSlowLog(ctx context.Context, sctx sessionctx.Context) { - if len(e.files) == 0 { + file := e.getNextFile() + if file == nil { close(e.parsedSlowLogCh) return } - reader := bufio.NewReader(e.files[0].file) - e.parseSlowLog(ctx, sctx, reader, 64) - close(e.parsedSlowLogCh) + reader := bufio.NewReader(file) + e.parseSlowLog(ctx, sctx, reader, ParseSlowLogBatchSize) } func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context) ([][]types.Datum, bool, error) { @@ -228,13 +253,13 @@ func (e *slowQueryRetriever) getBatchLog(reader *bufio.Reader, offset *offset, n lineByte, err := getOneLine(reader) if err != nil { if err == io.EOF { - e.fileIdx++ e.fileLine = 0 - if e.fileIdx >= len(e.files) { + file := e.getNextFile() + if file == nil { return log, nil } offset.length = len(log) - reader.Reset(e.files[e.fileIdx].file) + reader.Reset(file) continue } return log, err @@ -253,34 +278,38 @@ func (e *slowQueryRetriever) getBatchLog(reader *bufio.Reader, offset *offset, n } func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.Context, reader *bufio.Reader, logNum int) { + defer close(e.parsedSlowLogCh) var wg sync.WaitGroup offset := offset{offset: 0, length: 0} // To limit the num of go routine - ch := make(chan int, sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency) + concurrent := sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency + ch := make(chan int, concurrent) + if e.stats != nil { + e.stats.concurrent = concurrent + } defer close(ch) for { + startTime := time.Now() log, err := e.getBatchLog(reader, &offset, logNum) if err != nil { e.parsedSlowLogCh <- parsedSlowLog{nil, err} break } + if len(log) == 0 { + break + } + if e.stats != nil { + e.stats.readFile += time.Since(startTime) + } start := offset wg.Add(1) ch <- 1 go func() { defer wg.Done() result, err := e.parseLog(sctx, log, start) - if err != nil { - e.parsedSlowLogCh <- parsedSlowLog{nil, err} - } else { - e.parsedSlowLogCh <- parsedSlowLog{result, err} - } + e.parsedSlowLogCh <- parsedSlowLog{result, err} <-ch }() - // Read the next file, offset = 0 - if e.fileIdx >= len(e.files) { - break - } offset.offset = e.fileLine offset.length = 0 select { @@ -303,10 +332,14 @@ func getLineIndex(offset offset, index int) int { } func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offset offset) (data [][]types.Datum, err error) { + start := time.Now() defer func() { if r := recover(); r != nil { err = fmt.Errorf("%s", r) } + if e.stats != nil { + atomic.AddInt64(&e.stats.parseLog, int64(time.Since(start))) + } }() failpoint.Inject("errorMockParseSlowLogPanic", func(val failpoint.Value) { if val.(bool) { @@ -696,7 +729,16 @@ type logFile struct { // getAllFiles is used to get all slow-log needed to parse, it is exported for test. func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) { + totalFileNum := 0 + if e.stats != nil { + startTime := time.Now() + defer func() { + e.stats.initialize = time.Since(startTime) + e.stats.totalFileNum = totalFileNum + }() + } if e.extractor == nil || !e.extractor.Enable { + totalFileNum = 1 file, err := os.Open(logFilePath) if err != nil { if os.IsNotExist(err) { @@ -717,10 +759,11 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st } return nil } - err := filepath.Walk(logDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return handleErr(err) - } + files, err := ioutil.ReadDir(logDir) + if err != nil { + return nil, err + } + walkFn := func(path string, info os.FileInfo) error { if info.IsDir() { return nil } @@ -728,6 +771,7 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st if !strings.HasPrefix(path, prefix) { return nil } + totalFileNum++ file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) if err != nil { return handleErr(err) @@ -768,7 +812,13 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st }) skip = true return nil - }) + } + for _, file := range files { + err := walkFn(filepath.Join(logDir, file.Name()), file) + if err != nil { + return nil, err + } + } // Sort by start time sort.Slice(logFiles, func(i, j int) bool { return logFiles[i].start.Before(logFiles[j].start) @@ -800,55 +850,133 @@ func (e *slowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) } return t, errors.Errorf("malform slow query file %v", file.Name()) } + +func (e *slowQueryRetriever) getRuntimeStats() execdetails.RuntimeStats { + return e.stats +} + +type slowQueryRuntimeStats struct { + totalFileNum int + readFileNum int + readFile time.Duration + initialize time.Duration + readFileSize int64 + parseLog int64 + concurrent int +} + +// String implements the RuntimeStats interface. +func (s *slowQueryRuntimeStats) String() string { + return fmt.Sprintf("initialize: %s, read_file: %s, parse_log: {time:%s, concurrency:%v}, total_file: %v, read_file: %v, read_size: %s", + s.initialize, s.readFile, time.Duration(s.parseLog), s.concurrent, + s.totalFileNum, s.readFileNum, memory.BytesToString(s.readFileSize)) +} + +// Merge implements the RuntimeStats interface. +func (s *slowQueryRuntimeStats) Merge(rs execdetails.RuntimeStats) { + tmp, ok := rs.(*slowQueryRuntimeStats) + if !ok { + return + } + s.totalFileNum += tmp.totalFileNum + s.readFileNum += tmp.readFileNum + s.readFile += tmp.readFile + s.initialize += tmp.initialize + s.readFileSize += tmp.readFileSize + s.parseLog += tmp.parseLog +} + +// Clone implements the RuntimeStats interface. +func (s *slowQueryRuntimeStats) Clone() execdetails.RuntimeStats { + newRs := *s + return &newRs +} + +// Tp implements the RuntimeStats interface. +func (s *slowQueryRuntimeStats) Tp() int { + return execdetails.TpSlowQueryRuntimeStat +} + func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { var t time.Time + var tried int stat, err := file.Stat() if err != nil { return t, err } - fileSize := stat.Size() - cursor := int64(0) - line := make([]byte, 0, 64) + endCursor := stat.Size() maxLineNum := 128 - tryGetTime := func(line []byte) string { - for i, j := 0, len(line)-1; i < j; i, j = i+1, j-1 { - line[i], line[j] = line[j], line[i] + for { + lines, readBytes, err := readLastLines(file, endCursor) + if err != nil { + return t, err + } + // read out the file + if readBytes == 0 { + break + } + endCursor -= int64(readBytes) + for i := len(lines) - 1; i >= 0; i-- { + if strings.HasPrefix(lines[i], variable.SlowLogStartPrefixStr) { + return ParseTime(lines[i][len(variable.SlowLogStartPrefixStr):]) + } } - lineStr := string(line) - lineStr = strings.TrimSpace(lineStr) - if strings.HasPrefix(lineStr, variable.SlowLogStartPrefixStr) { - return lineStr[len(variable.SlowLogStartPrefixStr):] + tried += len(lines) + if tried >= maxLineNum { + break } - return "" } + return t, errors.Errorf("invalid slow query file %v", file.Name()) +} + +// Read lines from the end of a file +// endCursor initial value should be the filesize +func readLastLines(file *os.File, endCursor int64) ([]string, int, error) { + var lines []byte + var firstNonNewlinePos int + var cursor = endCursor for { - cursor -= 1 - _, err := file.Seek(cursor, io.SeekEnd) - if err != nil { - return t, err + // stop if we are at the beginning + // check it in the start to avoid read beyond the size + if cursor <= 0 { + break + } + + var size int64 = 4096 + if cursor < size { + size = cursor } + cursor -= size - char := make([]byte, 1) - _, err = file.Read(char) + _, err := file.Seek(cursor, io.SeekStart) if err != nil { - return t, err + return nil, 0, err } - // If find a line. - if cursor != -1 && (char[0] == '\n' || char[0] == '\r') { - if timeStr := tryGetTime(line); len(timeStr) > 0 { - return ParseTime(timeStr) - } - line = line[:0] - maxLineNum -= 1 + chars := make([]byte, size) + _, err = file.Read(chars) + if err != nil { + return nil, 0, err } - line = append(line, char[0]) - if cursor == -fileSize || maxLineNum <= 0 { - if timeStr := tryGetTime(line); len(timeStr) > 0 { - return ParseTime(timeStr) + lines = append(chars, lines...) + + // find first '\n' or '\r' + for i := 0; i < len(chars); i++ { + // reach the line end + // the first newline may be in the line end at the first round + if i >= len(lines)-1 { + break } - return t, errors.Errorf("malform slow query file %v", file.Name()) + if (chars[i] == 10 || chars[i] == 13) && chars[i+1] != 10 && chars[i+1] != 13 { + firstNonNewlinePos = i + 1 + break + } + } + if firstNonNewlinePos > 0 { + break } } + finalStr := string(lines[firstNonNewlinePos:]) + return strings.Split(strings.ReplaceAll(finalStr, "\r\n", "\n"), "\n"), len(finalStr), nil } func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx sessionctx.Context) { diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 5b7c28e2d8909..49c66aed1f1f1 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -430,6 +430,8 @@ const ( TpJoinRuntimeStats // TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats. TpSelectResultRuntimeStats + // TpSlowQueryRuntimeStat is the tp for TpSlowQueryRuntimeStat + TpSlowQueryRuntimeStat // TpInsertRuntimeStat is the tp for InsertRuntimeStat TpInsertRuntimeStat ) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index f44ea4af8b52b..cf639da4f5f8c 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -297,6 +297,11 @@ func (t *Tracker) toString(indent string, buffer *bytes.Buffer) { // BytesToString converts the memory consumption to a readable string. func (t *Tracker) BytesToString(numBytes int64) string { + return BytesToString(numBytes) +} + +// BytesToString converts the memory consumption to a readable string. +func BytesToString(numBytes int64) string { GB := float64(numBytes) / float64(1<<30) if GB > 1 { return fmt.Sprintf("%v GB", GB)