Skip to content

Commit

Permalink
fix: mysql: type conversion follow-up (influxdata#9966)
Browse files Browse the repository at this point in the history
  • Loading branch information
fxedel authored and VladislavSenkevich committed Nov 23, 2021
1 parent c9f8458 commit 4a4c324
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 75 deletions.
103 changes: 60 additions & 43 deletions plugins/inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mysql

import (
"bytes"
"database/sql"
"fmt"
"strconv"
Expand Down Expand Up @@ -638,7 +637,12 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu

value, err := m.parseGlobalVariables(key, val)
if err != nil {
m.Log.Debugf("Error parsing global variable %q: %v", key, err)
errString := fmt.Errorf("error parsing mysql global variable %q=%q: %v", key, string(val), err)
if m.MetricVersion < 2 {
m.Log.Debug(errString)
} else {
acc.AddError(errString)
}
} else {
fields[key] = value
}
Expand All @@ -658,11 +662,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu

func (m *Mysql) parseGlobalVariables(key string, value sql.RawBytes) (interface{}, error) {
if m.MetricVersion < 2 {
v, ok := v1.ParseValue(value)
if ok {
return v, nil
}
return v, fmt.Errorf("could not parse value: %q", string(value))
return v1.ParseValue(value)
}
return v2.ConvertGlobalVariables(key, value)
}
Expand Down Expand Up @@ -693,35 +693,58 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
// scanning keys and values separately

// get columns names, and create an array with its length
cols, err := rows.Columns()
cols, err := rows.ColumnTypes()
if err != nil {
return err
}
vals := make([]interface{}, len(cols))
vals := make([]sql.RawBytes, len(cols))
valPtrs := make([]interface{}, len(cols))
// fill the array with sql.Rawbytes
for i := range vals {
vals[i] = &sql.RawBytes{}
vals[i] = sql.RawBytes{}
valPtrs[i] = &vals[i]
}
if err = rows.Scan(vals...); err != nil {
if err = rows.Scan(valPtrs...); err != nil {
return err
}

// range over columns, and try to parse values
for i, col := range cols {
colName := col.Name()

if m.MetricVersion >= 2 {
col = strings.ToLower(col)
colName = strings.ToLower(colName)
}

colValue := vals[i]

if m.GatherAllSlaveChannels &&
(strings.ToLower(col) == "channel_name" || strings.ToLower(col) == "connection_name") {
(strings.ToLower(colName) == "channel_name" || strings.ToLower(colName) == "connection_name") {
// Since the default channel name is empty, we need this block
channelName := "default"
if len(*vals[i].(*sql.RawBytes)) > 0 {
channelName = string(*vals[i].(*sql.RawBytes))
if len(colValue) > 0 {
channelName = string(colValue)
}
tags["channel"] = channelName
} else if value, ok := m.parseValue(*vals[i].(*sql.RawBytes)); ok {
fields["slave_"+col] = value
continue
}

if colValue == nil || len(colValue) == 0 {
continue
}

value, err := m.parseValueByDatabaseTypeName(colValue, col.DatabaseTypeName())
if err != nil {
errString := fmt.Errorf("error parsing mysql slave status %q=%q: %v", colName, string(colValue), err)
if m.MetricVersion < 2 {
m.Log.Debug(errString)
} else {
acc.AddError(errString)
}
continue
}

fields["slave_"+colName] = value
}
acc.AddFields("mysql", fields, tags)

Expand Down Expand Up @@ -877,7 +900,7 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
key = strings.ToLower(key)
value, err := v2.ConvertGlobalStatus(key, val)
if err != nil {
m.Log.Debugf("Error parsing global status: %v", err)
acc.AddError(fmt.Errorf("error parsing mysql global status %q=%q: %v", key, string(val), err))
} else {
fields[key] = value
}
Expand Down Expand Up @@ -1346,10 +1369,16 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
if err := rows.Scan(&key, &val); err != nil {
return err
}

key = strings.ToLower(key)
if value, ok := m.parseValue(val); ok {
fields[key] = value
value, err := m.parseValueByDatabaseTypeName(val, "BIGINT")
if err != nil {
acc.AddError(fmt.Errorf("error parsing mysql InnoDB metric %q=%q: %v", key, string(val), err))
continue
}

fields[key] = value

// Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql_innodb", fields, tags)
Expand Down Expand Up @@ -1914,34 +1943,22 @@ func (m *Mysql) gatherSchemaForDB(db *sql.DB, database string, servtag string, a
return nil
}

func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) {
func (m *Mysql) parseValueByDatabaseTypeName(value sql.RawBytes, databaseTypeName string) (interface{}, error) {
if m.MetricVersion < 2 {
return v1.ParseValue(value)
}
return parseValue(value)
}

// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
func parseValue(value sql.RawBytes) (interface{}, bool) {
if bytes.EqualFold(value, []byte("YES")) || bytes.Equal(value, []byte("ON")) {
return 1, true
}

if bytes.EqualFold(value, []byte("NO")) || bytes.Equal(value, []byte("OFF")) {
return 0, true
}

if val, err := strconv.ParseInt(string(value), 10, 64); err == nil {
return val, true
}
if val, err := strconv.ParseFloat(string(value), 64); err == nil {
return val, true
}

if len(string(value)) > 0 {
return string(value), true
switch databaseTypeName {
case "INT":
return v2.ParseInt(value)
case "BIGINT":
return v2.ParseUint(value)
case "VARCHAR":
return v2.ParseString(value)
default:
m.Log.Debugf("unknown database type name %q in parseValueByDatabaseTypeName", databaseTypeName)
return v2.ParseValue(value)
}
return nil, false
}

// findThreadState can be used to find thread state by command and plain state
Expand Down
27 changes: 1 addition & 26 deletions plugins/inputs/mysql/mysql_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mysql

import (
"database/sql"
"fmt"
"testing"

Expand Down Expand Up @@ -178,31 +177,7 @@ func TestMysqlDNSAddTimeout(t *testing.T) {
}
}
}
func TestParseValue(t *testing.T) {
testCases := []struct {
rawByte sql.RawBytes
output interface{}
boolValue bool
}{
{sql.RawBytes("123"), int64(123), true},
{sql.RawBytes("abc"), "abc", true},
{sql.RawBytes("10.1"), 10.1, true},
{sql.RawBytes("ON"), 1, true},
{sql.RawBytes("OFF"), 0, true},
{sql.RawBytes("NO"), 0, true},
{sql.RawBytes("YES"), 1, true},
{sql.RawBytes("No"), 0, true},
{sql.RawBytes("Yes"), 1, true},
{sql.RawBytes("-794"), int64(-794), true},
{sql.RawBytes("18446744073709552333"), float64(18446744073709552000), true},
{sql.RawBytes(""), nil, false},
}
for _, cases := range testCases {
if got, ok := parseValue(cases.rawByte); got != cases.output && ok != cases.boolValue {
t.Errorf("for %s wanted %t, got %t", string(cases.rawByte), cases.output, got)
}
}
}

func TestNewNamespace(t *testing.T) {
testCases := []struct {
words []string
Expand Down
8 changes: 4 additions & 4 deletions plugins/inputs/mysql/v1/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ var Mappings = []*Mapping{
},
}

func ParseValue(value sql.RawBytes) (float64, bool) {
func ParseValue(value sql.RawBytes) (float64, error) {
if bytes.Equal(value, []byte("Yes")) || bytes.Equal(value, []byte("ON")) {
return 1, true
return 1, nil
}

if bytes.Equal(value, []byte("No")) || bytes.Equal(value, []byte("OFF")) {
return 0, true
return 0, nil
}
n, err := strconv.ParseFloat(string(value), 64)
return n, err == nil
return n, err
}
12 changes: 10 additions & 2 deletions plugins/inputs/mysql/v2/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func ParseUint(value sql.RawBytes) (interface{}, error) {
return strconv.ParseUint(string(value), 10, 64)
}

func ParseFloat(value sql.RawBytes) (interface{}, error) {
return strconv.ParseFloat(string(value), 64)
}

func ParseBoolAsInteger(value sql.RawBytes) (interface{}, error) {
if bytes.EqualFold(value, []byte("YES")) || bytes.EqualFold(value, []byte("ON")) {
return int64(1), nil
Expand Down Expand Up @@ -86,11 +90,15 @@ var GlobalStatusConversions = map[string]ConversionFunc{
"innodb_data_pending_fsyncs": ParseUint,
"ssl_ctx_verify_depth": ParseUint,
"ssl_verify_depth": ParseUint,

// see https://galeracluster.com/library/documentation/galera-status-variables.html
"wsrep_local_index": ParseUint,
"wsrep_local_send_queue_avg": ParseFloat,
}

// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
var GlobalVariableConversions = map[string]ConversionFunc{
// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
"delay_key_write": ParseString, // ON, OFF, ALL
"enforce_gtid_consistency": ParseString, // ON, OFF, WARN
"event_scheduler": ParseString, // YES, NO, DISABLED
Expand Down
41 changes: 41 additions & 0 deletions plugins/inputs/mysql/v2/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"database/sql"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -84,3 +85,43 @@ func TestCovertGlobalVariables(t *testing.T) {
})
}
}

func TestParseValue(t *testing.T) {
testCases := []struct {
rawByte sql.RawBytes
output interface{}
err string
}{
{sql.RawBytes("123"), int64(123), ""},
{sql.RawBytes("abc"), "abc", ""},
{sql.RawBytes("10.1"), 10.1, ""},
{sql.RawBytes("ON"), 1, ""},
{sql.RawBytes("OFF"), 0, ""},
{sql.RawBytes("NO"), 0, ""},
{sql.RawBytes("YES"), 1, ""},
{sql.RawBytes("No"), 0, ""},
{sql.RawBytes("Yes"), 1, ""},
{sql.RawBytes("-794"), int64(-794), ""},
{sql.RawBytes("2147483647"), int64(2147483647), ""}, // max int32
{sql.RawBytes("2147483648"), int64(2147483648), ""}, // too big for int32
{sql.RawBytes("9223372036854775807"), int64(9223372036854775807), ""}, // max int64
{sql.RawBytes("9223372036854775808"), uint64(9223372036854775808), ""}, // too big for int64
{sql.RawBytes("18446744073709551615"), uint64(18446744073709551615), ""}, // max uint64
{sql.RawBytes("18446744073709551616"), float64(18446744073709552000), ""}, // too big for uint64
{sql.RawBytes("18446744073709552333"), float64(18446744073709552000), ""}, // too big for uint64
{sql.RawBytes(""), nil, "unconvertible value"},
}
for _, cases := range testCases {
got, err := ParseValue(cases.rawByte)

if err != nil && cases.err == "" {
t.Errorf("for %q got unexpected error: %q", string(cases.rawByte), err.Error())
} else if err != nil && !strings.HasPrefix(err.Error(), cases.err) {
t.Errorf("for %q wanted error %q, got %q", string(cases.rawByte), cases.err, err.Error())
} else if err == nil && cases.err != "" {
t.Errorf("for %q did not get expected error: %s", string(cases.rawByte), cases.err)
} else if got != cases.output {
t.Errorf("for %q wanted %#v (%T), got %#v (%T)", string(cases.rawByte), cases.output, cases.output, got, got)
}
}
}

0 comments on commit 4a4c324

Please sign in to comment.