Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Rewrite data file parser; Insert _tidb_rowid when needed; Update checkpoint structure #82

Merged
merged 15 commits into from
Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*_generated.go linguist-generated=true
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ checksuccess:
echo "Lightning build successfully :-) !" ; \
fi

data_parsers:
ragel -Z -G2 -o tmp_parser.go lightning/mydump/parser.rl
@echo '// Code generated by ragel DO NOT EDIT.' | cat - tmp_parser.go > lightning/mydump/parser_generated.go
@rm tmp_parser.go

lightning:
$(GOBUILD) $(RACE_FLAG) -ldflags '$(LDFLAGS)' -o $(LIGHTNING_BIN) cmd/main.go

Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"os/signal"
"syscall"

"github.com/pkg/errors"
"github.com/pingcap/tidb-lightning/lightning"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
plan "github.com/pingcap/tidb/planner/core"
"github.com/pkg/errors"
)

func setGlobalVars() {
Expand Down
5 changes: 3 additions & 2 deletions lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"path/filepath"

"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
tmysql "github.com/pingcap/tidb/mysql"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -229,7 +229,8 @@ func IsRetryableError(err error) bool {
}

// IsContextCanceledError returns whether the error is caused by context
// cancellation.
// cancellation. This function returns `false` (not a context-canceled error) if
// `err == nil`.
func IsContextCanceledError(err error) bool {
err = errors.Cause(err)
return err == context.Canceled || status.Code(err) == codes.Canceled
Expand Down
46 changes: 46 additions & 0 deletions lightning/kv/allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package kv

import "sync/atomic"

// PanickingAllocator is an ID allocator which panics on all operations except Rebase
type PanickingAllocator struct {
base int64
}

func NewPanickingAllocator(base int64) *PanickingAllocator {
return &PanickingAllocator{base: base}
}

func (alloc *PanickingAllocator) Alloc(int64) (int64, error) {
panic("unexpected Alloc() call")
}

func (alloc *PanickingAllocator) Reset(newBase int64) {
panic("unexpected Reset() call")
}

func (alloc *PanickingAllocator) Rebase(tableID, newBase int64, allocIDs bool) error {
// CAS
for {
oldBase := atomic.LoadInt64(&alloc.base)
if newBase <= oldBase {
break
}
if atomic.CompareAndSwapInt64(&alloc.base, oldBase, newBase) {
break
}
}
return nil
}

func (alloc *PanickingAllocator) Base() int64 {
return atomic.LoadInt64(&alloc.base)
}

func (alloc *PanickingAllocator) End() int64 {
panic("unexpected End() call")
}

func (alloc *PanickingAllocator) NextGlobalAutoID(tableID int64) (int64, error) {
panic("unexpected NextGlobalAutoID() call")
}
15 changes: 6 additions & 9 deletions lightning/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/pingcap/tidb-lightning/lightning/metric"
sqltool "github.com/pingcap/tidb-lightning/lightning/sql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
kvec "github.com/pingcap/tidb/util/kvencoder"
)

Expand All @@ -31,13 +32,13 @@ type TableKVEncoder struct {
bufValues []interface{}

encoder kvec.KvEncoder
idAllocator *kvec.Allocator
idAllocator autoid.Allocator
}

func NewTableKVEncoder(
dbName string,
table string, tableID int64,
columns int, sqlMode string, alloc *kvec.Allocator) (*TableKVEncoder, error) {
columns int, sqlMode string, alloc autoid.Allocator) (*TableKVEncoder, error) {

encoder, err := kvec.New(dbName, alloc)
if err != nil {
Expand Down Expand Up @@ -96,10 +97,6 @@ func (kvcodec *TableKVEncoder) makeStatments(maxRows int) ([]uint32, error) {
return stmtIds, nil
}

func (kvcodec *TableKVEncoder) ResetRowID(rowID int64) {
kvcodec.idAllocator.Reset(rowID)
}

func (kvcodec *TableKVEncoder) Close() error {
metric.KvEncoderCounter.WithLabelValues("closed").Inc()
return errors.Trace(kvcodec.encoder.Close())
Expand All @@ -109,18 +106,18 @@ func (kvcodec *TableKVEncoder) NextRowID() int64 {
return kvcodec.idAllocator.Base() + 1
}

func (kvcodec *TableKVEncoder) SQL2KV(sql []byte) ([]kvec.KvPair, uint64, error) {
func (kvcodec *TableKVEncoder) SQL2KV(sql string) ([]kvec.KvPair, uint64, error) {
if PrepareStmtMode {
// via prepare statment
kvPairs, rowsAffected, err := kvcodec.encodeViaPstmt(sql)
kvPairs, rowsAffected, err := kvcodec.encodeViaPstmt([]byte(sql))
if err == nil {
return kvPairs, rowsAffected, nil
}
common.AppLogger.Warnf("[sql2kv] stmt encode err : %s", err.Error())
}

// via sql execution
kvPairs, rowsAffected, err := kvcodec.encoder.Encode(string(sql), kvcodec.tableID)
kvPairs, rowsAffected, err := kvcodec.encoder.Encode(sql, kvcodec.tableID)
if err != nil {
common.AppLogger.Errorf("[sql2kv] sql encode error = %v", err)
return nil, 0, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"runtime"
"sync"

"github.com/pkg/errors"
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/tidb-lightning/lightning/common"
Expand Down
198 changes: 198 additions & 0 deletions lightning/mydump/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package mydump

import (
"io"

"github.com/pkg/errors"
)

// ChunkParser is a parser of the data files (the file containing only INSERT
// statements).
type ChunkParser struct {
// states for the lexer
reader io.Reader
buf []byte
bufSize int
isLastChunk bool

lastRow Row
// Current file offset.
pos int64
// The (quoted) table name used in the last INSERT statement. Assumed to be
// constant throughout the entire file.
TableName []byte
// The list of columns in the form `(a, b, c)` in the last INSERT statement.
// Assumed to be constant throughout the entire file.
Columns []byte
}

// Chunk represents a portion of the data file.
type Chunk struct {
Offset int64
EndOffset int64
PrevRowIDMax int64
RowIDMax int64
}

// Row is the content of a row.
type Row struct {
RowID int64
Row []byte
}

// NewChunkParser creates a new parser which can read chunks out of a file.
func NewChunkParser(reader io.Reader) *ChunkParser {
return &ChunkParser{
reader: reader,
bufSize: 8192,
}
}

// Reader returns the underlying reader of this parser.
func (parser *ChunkParser) Reader() io.Reader {
return parser.reader
}

// SetPos changes the reported position and row ID.
func (parser *ChunkParser) SetPos(pos int64, rowID int64) {
parser.pos = pos
parser.lastRow.RowID = rowID
}

// Pos returns the current file offset.
func (parser *ChunkParser) Pos() int64 {
return parser.pos
}

type token byte

const (
tokNil token = iota
tokValues
tokRow
tokName
)

func tryAppendTo(out *[]byte, tail []byte) {
if out == nil || len(tail) == 0 {
return
}
if len(*out) == 0 {
*out = tail
} else {
*out = append(*out, tail...)
}
}

func (parser *ChunkParser) readBlock() error {
block := make([]byte, parser.bufSize)

n, err := io.ReadFull(parser.reader, block)
switch err {
case io.ErrUnexpectedEOF:
parser.isLastChunk = true
fallthrough
case nil:
tryAppendTo(&parser.buf, block[:n])
return nil
default:
return errors.Trace(err)
}
}

// ReadRow reads a row from the datafile.
func (parser *ChunkParser) ReadRow() error {
// This parser will recognize contents like:
//
// `tableName` (...) VALUES (...) (...) (...)
//
// Keywords like INSERT, INTO and separators like ',' and ';' are treated
// like comments and ignored. Therefore, this parser will accept some
// nonsense input. The advantage is the parser becomes extremely simple,
// suitable for us where we just want to quickly and accurately split the
// file apart, not to validate the content.

type state byte

const (
// the state after reading "VALUES"
stateRow state = iota
// the state after reading the table name, before "VALUES"
stateColumns
)

row := &parser.lastRow
st := stateRow

for {
tok, content, err := parser.lex()
if err != nil {
return errors.Trace(err)
}
switch tok {
case tokRow:
switch st {
case stateRow:
row.RowID++
row.Row = content
return nil
case stateColumns:
parser.Columns = content
continue
}

case tokName:
st = stateColumns
parser.TableName = content
parser.Columns = nil
continue

case tokValues:
st = stateRow
continue

default:
return errors.Errorf("Syntax error at position %d", parser.pos)
}
}
}

// LastRow is the copy of the row parsed by the last call to ReadRow().
func (parser *ChunkParser) LastRow() Row {
return parser.lastRow
}

// ReadChunks parses the entire file and splits it into continuous chunks of
// size >= minSize.
func (parser *ChunkParser) ReadChunks(minSize int64) ([]Chunk, error) {
var chunks []Chunk

cur := Chunk{
Offset: parser.pos,
EndOffset: parser.pos,
PrevRowIDMax: parser.lastRow.RowID,
RowIDMax: parser.lastRow.RowID,
}

for {
switch err := parser.ReadRow(); errors.Cause(err) {
case nil:
cur.EndOffset = parser.pos
cur.RowIDMax = parser.lastRow.RowID
if cur.EndOffset-cur.Offset >= minSize {
chunks = append(chunks, cur)
cur.Offset = cur.EndOffset
cur.PrevRowIDMax = cur.RowIDMax
}

case io.EOF:
if cur.Offset < cur.EndOffset {
chunks = append(chunks, cur)
}
return chunks, nil

default:
return nil, errors.Trace(err)
}
}
}
Loading