Skip to content

Commit

Permalink
parser: fix csv parse header with empty line (pingcap#364)
Browse files Browse the repository at this point in the history
* fix csv header

* fix infinite loop

* add a test
  • Loading branch information
glorv authored Aug 6, 2020
1 parent 1dc2d6a commit d1024a2
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 29 deletions.
33 changes: 24 additions & 9 deletions lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package mydump

import (
"bytes"
"io"
"strings"
"unicode"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-lightning/lightning/config"
Expand Down Expand Up @@ -51,13 +53,17 @@ type CSVParser struct {
fieldIndexes []int

lastRecord []string

// if set to true, csv parser will treat the first non-empty line as header line
shouldParseHeader bool
}

func NewCSVParser(
cfg *config.CSVConfig,
reader ReadSeekCloser,
blockBufSize int64,
ioWorkers *worker.Pool,
shouldParseHeader bool,
) *CSVParser {
quote := byte(0)
if len(cfg.Delimiter) > 0 {
Expand All @@ -78,13 +84,14 @@ func NewCSVParser(
}

return &CSVParser{
blockParser: makeBlockParser(reader, blockBufSize, ioWorkers),
cfg: cfg,
comma: cfg.Separator[0],
quote: quote,
escFlavor: escFlavor,
quoteIndexFunc: makeBytesIndexFunc(quoteStopSet),
unquoteIndexFunc: makeBytesIndexFunc(unquoteStopSet),
blockParser: makeBlockParser(reader, blockBufSize, ioWorkers),
cfg: cfg,
comma: cfg.Separator[0],
quote: quote,
escFlavor: escFlavor,
quoteIndexFunc: makeBytesIndexFunc(quoteStopSet),
unquoteIndexFunc: makeBytesIndexFunc(unquoteStopSet),
shouldParseHeader: shouldParseHeader,
}
}

Expand Down Expand Up @@ -177,6 +184,7 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) {
parser.fieldIndexes = parser.fieldIndexes[:0]

isEmptyLine := true
whitespaceLine := true
outside:
for {
firstByte, err := parser.readByte()
Expand All @@ -191,17 +199,23 @@ outside:
switch firstByte {
case parser.comma:
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))

whitespaceLine = false
case parser.quote:
if err := parser.readQuotedField(); err != nil {
return nil, err
}
whitespaceLine = false

case '\r', '\n':
// new line = end of record (ignore empty lines)
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimFunc(parser.recordBuffer, unicode.IsSpace)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
}
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
break outside

Expand Down Expand Up @@ -327,7 +341,7 @@ func (parser *CSVParser) ReadRow() error {
row.RowID++

// skip the header first
if parser.pos == 0 && parser.cfg.Header {
if parser.shouldParseHeader {
columns, err := parser.readRecord(nil)
if err != nil {
return errors.Trace(err)
Expand All @@ -337,6 +351,7 @@ func (parser *CSVParser) ReadRow() error {
colName, _ = parser.unescapeString(colName)
parser.columns = append(parser.columns, strings.ToLower(colName))
}
parser.shouldParseHeader = false
}

records, err := parser.readRecord(parser.lastRecord)
Expand Down
83 changes: 65 additions & 18 deletions lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type testCase struct {

func (s *testMydumpCSVParserSuite) runTestCases(c *C, cfg *config.CSVConfig, blockBufSize int64, cases []testCase) {
for _, tc := range cases {
parser := mydump.NewCSVParser(cfg, mydump.NewStringReader(tc.input), blockBufSize, s.ioWorkers)
parser := mydump.NewCSVParser(cfg, mydump.NewStringReader(tc.input), blockBufSize, s.ioWorkers, false)
for i, row := range tc.expected {
comment := Commentf("input = %q, row = %d", tc.input, i+1)
e := parser.ReadRow()
Expand All @@ -68,7 +68,7 @@ func (s *testMydumpCSVParserSuite) runTestCases(c *C, cfg *config.CSVConfig, blo

func (s *testMydumpCSVParserSuite) runFailingTestCases(c *C, cfg *config.CSVConfig, blockBufSize int64, cases []string) {
for _, tc := range cases {
parser := mydump.NewCSVParser(cfg, mydump.NewStringReader(tc), blockBufSize, s.ioWorkers)
parser := mydump.NewCSVParser(cfg, mydump.NewStringReader(tc), blockBufSize, s.ioWorkers, false)
e := parser.ReadRow()
c.Assert(e, ErrorMatches, "syntax error.*", Commentf("input = %q / %s", tc, errors.ErrorStack(e)))
}
Expand All @@ -87,7 +87,7 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) {
TrimLastSep: true,
}

parser := mydump.NewCSVParser(&cfg, reader, config.ReadBlockSize, s.ioWorkers)
parser := mydump.NewCSVParser(&cfg, reader, config.ReadBlockSize, s.ioWorkers, false)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -151,7 +151,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) {

// example 1, trailing new lines

parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx\n"), config.ReadBlockSize, s.ioWorkers)
parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx\n"), config.ReadBlockSize, s.ioWorkers, false)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) {

// example 2, no trailing new lines

parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx"), config.ReadBlockSize, s.ioWorkers)
parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx"), config.ReadBlockSize, s.ioWorkers, false)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -207,7 +207,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) {

// example 5, quoted fields

parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","bbb","ccc"`+"\nzzz,yyy,xxx"), config.ReadBlockSize, s.ioWorkers)
parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","bbb","ccc"`+"\nzzz,yyy,xxx"), config.ReadBlockSize, s.ioWorkers, false)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -237,7 +237,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) {

parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","b
bb","ccc"
zzz,yyy,xxx`), config.ReadBlockSize, s.ioWorkers)
zzz,yyy,xxx`), config.ReadBlockSize, s.ioWorkers, false)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -265,7 +265,7 @@ zzz,yyy,xxx`), config.ReadBlockSize, s.ioWorkers)

// example 7, quote escaping

parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","b""bb","ccc"`), config.ReadBlockSize, s.ioWorkers)
parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","b""bb","ccc"`), config.ReadBlockSize, s.ioWorkers, false)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand All @@ -292,7 +292,7 @@ func (s *testMydumpCSVParserSuite) TestMySQL(c *C) {

parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"\"","\\","\?"
"\
",\N,\\N`), config.ReadBlockSize, s.ioWorkers)
",\N,\\N`), config.ReadBlockSize, s.ioWorkers, false)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -357,7 +357,7 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) {
parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(`a b c d e f
0 foo 0000-00-00
0 foo 0000-00-00
0 abc def ghi bar 1999-12-31`), config.ReadBlockSize, s.ioWorkers)
0 abc def ghi bar 1999-12-31`), config.ReadBlockSize, s.ioWorkers, true)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -405,23 +405,70 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) {
c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF)
}

func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) {
cfg := config.CSVConfig{
Separator: ",",
Delimiter: `"`,
}
data := " \r\n\r\n0,,abc\r\n \r\n123,1999-12-31,test\r\n"
parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(data), config.ReadBlockSize, s.ioWorkers, false)
c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum("0"),
nullDatum,
types.NewStringDatum("abc"),
},
})

c.Assert(parser, posEq, 12, 1)
c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
RowID: 2,
Row: []types.Datum{
types.NewStringDatum("123"),
types.NewStringDatum("1999-12-31"),
types.NewStringDatum("test"),
},
})
c.Assert(parser.Close(), IsNil)

cfg.Header = true
data = " \r\na,b,c\r\n0,,abc\r\n"
parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(data), config.ReadBlockSize, s.ioWorkers, true)
c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.Columns(), DeepEquals, []string{"a", "b", "c"})
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum("0"),
nullDatum,
types.NewStringDatum("abc"),
},
})

c.Assert(parser, posEq, 17, 1)
c.Assert(parser.Close(), IsNil)
}

func (s *testMydumpCSVParserSuite) TestEmpty(c *C) {
cfg := config.CSVConfig{
Separator: ",",
Delimiter: `"`,
}

parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(""), config.ReadBlockSize, s.ioWorkers)
parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(""), config.ReadBlockSize, s.ioWorkers, false)
c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF)

// Try again with headers.

cfg.Header = true

parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(""), config.ReadBlockSize, s.ioWorkers)
parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(""), config.ReadBlockSize, s.ioWorkers, true)
c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF)

parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader("h\n"), config.ReadBlockSize, s.ioWorkers)
parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader("h\n"), config.ReadBlockSize, s.ioWorkers, true)
c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF)
}

Expand All @@ -430,7 +477,7 @@ func (s *testMydumpCSVParserSuite) TestCRLF(c *C) {
Separator: ",",
Delimiter: `"`,
}
parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader("a\rb\r\nc\n\n\n\nd"), config.ReadBlockSize, s.ioWorkers)
parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader("a\rb\r\nc\n\n\n\nd"), config.ReadBlockSize, s.ioWorkers, false)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -465,7 +512,7 @@ func (s *testMydumpCSVParserSuite) TestQuotedSeparator(c *C) {
Delimiter: `"`,
}

parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(`",",','`), config.ReadBlockSize, s.ioWorkers)
parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(`",",','`), config.ReadBlockSize, s.ioWorkers, false)
c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
RowID: 1,
Expand Down Expand Up @@ -636,7 +683,7 @@ func (s *testMydumpCSVParserSuite) TestReadError(c *C) {
Delimiter: `"`,
}

parser := mydump.NewCSVParser(&cfg, &errorReader{}, config.ReadBlockSize, s.ioWorkers)
parser := mydump.NewCSVParser(&cfg, &errorReader{}, config.ReadBlockSize, s.ioWorkers, false)
c.Assert(parser.ReadRow(), ErrorMatches, "fake read error")
}

Expand All @@ -648,7 +695,7 @@ func (s *testMydumpCSVParserSuite) TestSyntaxErrorLog(c *C) {
}

tc := mydump.NewStringReader("x'" + strings.Repeat("y", 50000))
parser := mydump.NewCSVParser(&cfg, tc, 50000, s.ioWorkers)
parser := mydump.NewCSVParser(&cfg, tc, 50000, s.ioWorkers, false)
logger, buffer := log.MakeTestLogger()
parser.SetLogger(logger)
c.Assert(parser.ReadRow(), ErrorMatches, "syntax error.*")
Expand Down Expand Up @@ -697,7 +744,7 @@ func (s *benchCSVParserSuite) BenchmarkReadRowUsingMydumpCSVParser(c *C) {
}()

cfg := config.CSVConfig{Separator: ","}
parser := mydump.NewCSVParser(&cfg, file, 65536, s.ioWorkers)
parser := mydump.NewCSVParser(&cfg, file, 65536, s.ioWorkers, false)
parser.SetLogger(log.Logger{Logger: zap.NewNop()})

rowsCount := 0
Expand Down
2 changes: 1 addition & 1 deletion lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func SplitLargeFile(
if err != nil {
return 0, nil, nil, err
}
parser := NewCSVParser(&cfg.Mydumper.CSV, reader, cfg.Mydumper.ReadBlockSize, ioWorker)
parser := NewCSVParser(&cfg.Mydumper.CSV, reader, cfg.Mydumper.ReadBlockSize, ioWorker, false)
parser.SetPos(endOffset, prevRowIdMax)
pos, err := parser.ReadUntilTokNewLine()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,8 @@ func newChunkRestore(
var parser mydump.Parser
switch path.Ext(strings.ToLower(chunk.Key.Path)) {
case ".csv":
parser = mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, blockBufSize, ioWorkers)
hasHeader := cfg.Mydumper.CSV.Header && chunk.Chunk.Offset == 0
parser = mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, blockBufSize, ioWorkers, hasHeader)
default:
parser = mydump.NewChunkParser(cfg.TiDB.SQLMode, reader, blockBufSize, ioWorkers)
}
Expand Down

0 comments on commit d1024a2

Please sign in to comment.