Skip to content

Commit

Permalink
Merge pull request #18779 from influxdata/18744/csv2lp
Browse files Browse the repository at this point in the history
feat(cmd/influx/write): add new processing options and enhancements
  • Loading branch information
sranka authored Sep 12, 2020
2 parents cbc640b + 2c25044 commit be8b2a9
Show file tree
Hide file tree
Showing 12 changed files with 494 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ need to update any InfluxDB CLI config profiles with the new port number.

### Features

1. [18779](https://github.com/influxdata/influxdb/pull/18779): Add new processing options and enhancements to influx write.
1. [19246](https://github.com/influxdata/influxdb/pull/19246): Redesign load data page to increase discovery and ease of use
1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command
1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset.
Expand Down
25 changes: 25 additions & 0 deletions cmd/influx/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/csv"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -38,6 +39,7 @@ type writeFlagsType struct {
SkipHeader int
IgnoreDataTypeInColumnName bool
Encoding string
ErrorsFile string
}

var writeFlags writeFlagsType
Expand Down Expand Up @@ -86,6 +88,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd.PersistentFlags().BoolVar(&writeFlags.IgnoreDataTypeInColumnName, "xIgnoreDataTypeInColumnName", false, "Ignores dataType which could be specified after ':' in column name")
cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice
cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to")

cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
Expand Down Expand Up @@ -204,6 +207,27 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
}
}

// create writer for errors-file, if supplied
var errorsFile *csv.Writer
var rowSkippedListener func(*csv2lp.CsvToLineReader, error, []string)
if writeFlags.ErrorsFile != "" {
writer, err := os.Create(writeFlags.ErrorsFile)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to create %q: %v", writeFlags.ErrorsFile, err)
}
closers = append(closers, writer)
errorsFile = csv.NewWriter(writer)
rowSkippedListener = func(source *csv2lp.CsvToLineReader, lineError error, row []string) {
log.Println(lineError)
errorsFile.Comma = source.Comma()
errorsFile.Write([]string{fmt.Sprintf("# error : %v", lineError)})
if err := errorsFile.Write(row); err != nil {
log.Printf("Unable to write to error-file: %v\n", err)
}
errorsFile.Flush() // flush is required
}
}

// concatenate readers
r := io.MultiReader(readers...)
if writeFlags.Format == inputFormatCsv {
Expand All @@ -213,6 +237,7 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
csvReader.Table.IgnoreDataTypeInColumnName(writeFlags.IgnoreDataTypeInColumnName)
// change LineNumber to report file/stdin line numbers properly
csvReader.LineNumber = writeFlags.SkipHeader - len(writeFlags.Headers)
csvReader.RowSkipped = rowSkippedListener
r = csvReader
}
return r, csv2lp.MultiCloser(closers...), nil
Expand Down
17 changes: 17 additions & 0 deletions cmd/influx/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func readLines(reader io.Reader) []string {

func createTempFile(suffix string, contents []byte) string {
file, err := ioutil.TempFile("", "influx_writeTest*."+suffix)
file.Close() // Close immediatelly, since we need only a file name
if err != nil {
log.Fatal(err)
return "unknown.file"
Expand Down Expand Up @@ -545,3 +546,19 @@ func Test_fluxWriteF(t *testing.T) {
require.Equal(t, "stdin3 i=stdin1,j=stdin2,k=stdin4", strings.Trim(string(lineData), "\n"))
})
}

// Test_writeFlags_errorsFile tests that rejected rows are written to errors file
func Test_writeFlags_errorsFile(t *testing.T) {
defer removeTempFiles()
errorsFile := createTempFile("errors", []byte{})
stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1"
out := bytes.Buffer{}
command := cmdWrite(&globalFlags{}, genericCLIOpts{in: strings.NewReader(stdInContents), w: bufio.NewWriter(&out)})
command.SetArgs([]string{"dryrun", "--format", "csv", "--errors-file", errorsFile})
err := command.Execute()
require.Nil(t, err)
require.Equal(t, "m a=1i", strings.Trim(out.String(), "\n"))
errorLines, err := ioutil.ReadFile(errorsFile)
require.Nil(t, err)
require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n"))
}
8 changes: 8 additions & 0 deletions pkg/csv2lp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ Existing [data types](https://v2.docs.influxdata.com/v2.0/reference/syntax/annot
- `#constant` annotation adds a constant column to the data, so you can set measurement, time, field or tag of every row you import
- the format of a constant annotation row is `#constant,datatype,name,value`', it contains supported datatype, a column name, and a constant value
- _column name_ can be omitted for _dateTime_ or _measurement_ columns, so the annotation can be simply `#constant,measurement,cpu`
- `#concat` annotation adds a new column that is concatenated from existing columns according to a template
- the format of a concat annotation row is `#concat,datatype,name,template`', it contains supported datatype, a column name, and a template value
- the `template` is a string with `${columnName}` placeholders, in which the placeholders are replaced by values of existing columns
- for example: `#concat,string,fullName,${firstName} ${lastName}`
- _column name_ can be omitted for _dateTime_ or _measurement_ columns
- `#timezone` annotation specifies the time zone of the data using an offset, which is either `+hhmm` or `-hhmm` or `Local` to use the local/computer time zone. Examples: _#timezone,+0100_ _#timezone -0500_ _#timezone Local_

#### Data type with data format
Expand All @@ -158,6 +163,9 @@ All data types can include the format that is used to parse column data. It is t
- note that you have to quote column delimiters whenever they appear in a CSV column value, for example:
- `#constant,"double:,.",myColumn,"1.234,011"`
- `long:format` and `unsignedLong:format` support the same format as `double`, but everything after and including a fraction character is ignored
- the format can be prepended with `strict` to fail when a fraction digit is present, for example:
- `1000.000` is `1000` when parsed as `long`, but fails when parsed as `long:strict`
- `1_000,000` is `1000` when parsed as `long:,_`, but fails when parsed as `long:strict,_`
- `boolean:truthy:falsy`
- `truthy` and `falsy` are comma-separated lists of values, they can be empty to assume all values as truthy/falsy; for example `boolean:sí,yes,ja,oui,ano,да:no,nein,non,ne,нет`
- a `boolean` data type (without the format) parses column values that start with any of _tTyY1_ as `true` values, _fFnN0_ as `false` values and fails on other values
Expand Down
29 changes: 27 additions & 2 deletions pkg/csv2lp/csv2lp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,21 @@ type CsvLineError struct {
}

func (e CsvLineError) Error() string {
return fmt.Sprintf("line %d: %v", e.Line, e.Err)
if e.Line > 0 {
return fmt.Sprintf("line %d: %v", e.Line, e.Err)
}
return fmt.Sprintf("%v", e.Err)
}

// CreateRowColumnError wraps an existing error to add line and column coordinates
func CreateRowColumnError(line int, columnLabel string, err error) CsvLineError {
return CsvLineError{
Line: line,
Err: CsvColumnError{
Column: columnLabel,
Err: err,
},
}
}

// CsvToLineReader represents state of transformation from csv data to lien protocol reader
Expand All @@ -34,6 +48,8 @@ type CsvToLineReader struct {
dataRowAdded bool
// log CSV data errors to sterr and continue with CSV processing
skipRowOnError bool
// RowSkipped is called when a row is skipped because of data parsing error
RowSkipped func(source *CsvToLineReader, lineError error, row []string)

// reader results
buffer []byte
Expand All @@ -54,6 +70,11 @@ func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader {
return state
}

// Comma returns a field delimiter used in an input CSV file
func (state *CsvToLineReader) Comma() rune {
return state.csv.Comma
}

// Read implements io.Reader that returns protocol lines
func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
// state1: finished
Expand Down Expand Up @@ -98,13 +119,17 @@ func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
if state.Table.AddRow(row) {
var err error
state.lineBuffer = state.lineBuffer[:0] // reuse line buffer
state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row)
state.lineBuffer, err = state.Table.AppendLine(state.lineBuffer, row, state.LineNumber)
if !state.dataRowAdded && state.logTableDataColumns {
log.Println(state.Table.DataColumnsInfo())
}
state.dataRowAdded = true
if err != nil {
lineError := CsvLineError{state.LineNumber, err}
if state.RowSkipped != nil {
state.RowSkipped(state, lineError, row)
continue
}
if state.skipRowOnError {
log.Println(lineError)
continue
Expand Down
66 changes: 66 additions & 0 deletions pkg/csv2lp/csv2lp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,68 @@ func Test_CsvToLineProtocol_SkipRowOnError(t *testing.T) {
require.Equal(t, messages, 2)
}

// Test_CsvToLineProtocol_RowSkipped tests that error rows are reported to configured RowSkipped listener
func Test_CsvToLineProtocol_RowSkipped(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
}()

type ActualArguments = struct {
src *CsvToLineReader
err error
row []string
}
type ExpectedArguments = struct {
errorString string
row []string
}

csv := "sep=;\n_measurement;a|long:strict\n;1\ncpu;2.1\ncpu;3a\n"
calledArgs := []ActualArguments{}
expectedArgs := []ExpectedArguments{
{
"line 3: column '_measurement': no measurement supplied",
[]string{"", "1"},
},
{
"line 4: column 'a': '2.1' cannot fit into long data type",
[]string{"cpu", "2.1"},
},
{
"line 5: column 'a': strconv.ParseInt:",
[]string{"cpu", "3a"},
},
}

reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) {
// make a copy of _row
row := make([]string, len(_row))
copy(row, _row)
// remember for comparison
calledArgs = append(calledArgs, ActualArguments{
src, err, row,
})
}
// read all the data
ioutil.ReadAll(reader)

out := buf.String()
require.Empty(t, out, "No log messages expected because RowSkipped handler is set")

require.Len(t, calledArgs, 3)
for i, expected := range expectedArgs {
require.Equal(t, reader, calledArgs[i].src)
require.Contains(t, calledArgs[i].err.Error(), expected.errorString)
require.Equal(t, expected.row, calledArgs[i].row)
}
}

// Test_CsvLineError tests CsvLineError error format
func Test_CsvLineError(t *testing.T) {
var tests = []struct {
Expand All @@ -218,6 +280,10 @@ func Test_CsvLineError(t *testing.T) {
CsvLineError{Line: 2, Err: CsvColumnError{"a", errors.New("cause")}},
"line 2: column 'a': cause",
},
{
CsvLineError{Line: -1, Err: CsvColumnError{"a", errors.New("cause")}},
"column 'a': cause",
},
}
for _, test := range tests {
require.Equal(t, test.value, test.err.Error())
Expand Down
60 changes: 54 additions & 6 deletions pkg/csv2lp/csv_annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package csv2lp

import (
"fmt"
"log"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -33,9 +34,8 @@ func (a annotationComment) matches(comment string) bool {
return strings.HasPrefix(strings.ToLower(comment), a.prefix)
}

// constantSetupTable setups the supplied CSV table from #constant annotation
func constantSetupTable(table *CsvTable, row []string) error {
// adds a virtual column with contsant value to all data rows
func createConstantOrConcatColumn(table *CsvTable, row []string, annotationName string) CsvTableColumn {
// adds a virtual column with constant value to all data rows
// supported types of constant annotation rows are:
// 1. "#constant,datatype,label,defaultValue"
// 2. "#constant,measurement,value"
Expand Down Expand Up @@ -72,17 +72,61 @@ func constantSetupTable(table *CsvTable, row []string) error {
if col.DefaultValue == "" && col.Label != "" {
// type 2,3,5,6
col.DefaultValue = col.Label
col.Label = "#constant " + col.DataType
col.Label = annotationName + " " + col.DataType
} else if col.Label == "" {
// setup a label if no label is supplied fo focused error messages
col.Label = "#constant " + col.DataType
// setup a label if no label is supplied for focused error messages
col.Label = annotationName + " " + col.DataType
}
}
// add a virtual column to the table
return col
}

// constantSetupTable setups the supplied CSV table from #constant annotation
func constantSetupTable(table *CsvTable, row []string) error {
col := createConstantOrConcatColumn(table, row, "#constant")
// add a virtual column to the table
table.extraColumns = append(table.extraColumns, &col)
return nil
}

// computedReplacer is used to replace value in computed columns
var computedReplacer *regexp.Regexp = regexp.MustCompile(`\$\{[^}]+\}`)

// concatSetupTable setups the supplied CSV table from #concat annotation
func concatSetupTable(table *CsvTable, row []string) error {
col := createConstantOrConcatColumn(table, row, "#concat")
template := col.DefaultValue
col.ComputeValue = func(row []string) string {
return computedReplacer.ReplaceAllStringFunc(template, func(text string) string {
columnLabel := text[2 : len(text)-1] // ${columnLabel}
if placeholderColumn := table.Column(columnLabel); placeholderColumn != nil {
return placeholderColumn.Value(row)
}
log.Printf("WARNING: column %s: column '%s' cannot be replaced, no such column available", col.Label, columnLabel)
return ""
})
}
// add a virtual column to the table
table.extraColumns = append(table.extraColumns, &col)
// add validator to report error when no placeholder column is available
table.validators = append(table.validators, func(table *CsvTable) error {
placeholders := computedReplacer.FindAllString(template, len(template))
for _, placeholder := range placeholders {
columnLabel := placeholder[2 : len(placeholder)-1] // ${columnLabel}
if placeholderColumn := table.Column(columnLabel); placeholderColumn == nil {
return CsvColumnError{
Column: col.Label,
Err: fmt.Errorf("'%s' references an uknown column '%s', available columns are: %v",
template, columnLabel, strings.Join(table.ColumnLabels(), ",")),
}
}
}
return nil
})
return nil
}

// supportedAnnotations contains all supported CSV annotations comments
var supportedAnnotations = []annotationComment{
{
Expand Down Expand Up @@ -131,6 +175,10 @@ var supportedAnnotations = []annotationComment{
return nil
},
},
{
prefix: "#concat",
setupTable: concatSetupTable,
},
}

// ignoreLeadingComment returns a value without '#anyComment ' prefix
Expand Down
Loading

0 comments on commit be8b2a9

Please sign in to comment.