diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 063452b7cbe5c..1aad295c8626f 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -282,9 +282,9 @@ const ( GROUP BY command,state ORDER BY null` infoSchemaUserStatisticsQuery = ` - SELECT *,count(*) + SELECT * FROM information_schema.user_statistics - GROUP BY user` + GROUP BY user` infoSchemaAutoIncQuery = ` SELECT table_schema, table_name, column_name, auto_increment, CAST(pow(2, case data_type @@ -790,72 +790,13 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum // gather connection metrics from user_statistics for each user if m.GatherUserStatistics { - conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user") + err = m.GatherUserStatisticsStatuses(db, serv, acc) if err != nil { - log.Printf("E! MySQL Error gathering user stats: %s", err) - } else { - for conn_rows.Next() { - var user string - var total_connections int64 - var concurrent_connections int64 - var connected_time int64 - var busy_time int64 - var cpu_time int64 - var bytes_received int64 - var bytes_sent int64 - var binlog_bytes_written int64 - var rows_fetched int64 - var rows_updated int64 - var table_rows_read int64 - var select_commands int64 - var update_commands int64 - var other_commands int64 - var commit_transactions int64 - var rollback_transactions int64 - var denied_connections int64 - var lost_connections int64 - var access_denied int64 - var empty_queries int64 - var total_ssl_connections int64 - - err = conn_rows.Scan(&user, &total_connections, &concurrent_connections, - &connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, - &rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, - &commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, - &empty_queries, &total_ssl_connections, - ) - - if err != nil { - return err - } - - tags := map[string]string{"server": servtag, "user": user} - fields := map[string]interface{}{ - "total_connections": total_connections, - "concurrent_connections": concurrent_connections, - "connected_time": connected_time, - "busy_time": busy_time, - "cpu_time": cpu_time, - "bytes_received": bytes_received, - "bytes_sent": bytes_sent, - "binlog_bytes_written": binlog_bytes_written, - "rows_fetched": rows_fetched, - "rows_updated": rows_updated, - "table_rows_read": table_rows_read, - "select_commands": select_commands, - "update_commands": update_commands, - "other_commands": other_commands, - "commit_transactions": commit_transactions, - "rollback_transactions": rollback_transactions, - "denied_connections": denied_connections, - "lost_connections": lost_connections, - "access_denied": access_denied, - "empty_queries": empty_queries, - "total_ssl_connections": total_ssl_connections, - } - - acc.AddFields("mysql_user_stats", fields, tags) + // disable collecting if table is not found (suppresses repeat errors) + if strings.Contains(err.Error(), "nknown table 'user_statistics'") { + m.GatherUserStatistics = false } + log.Printf("E! MySQL Error gathering user stats: %s", err) } } @@ -920,74 +861,210 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr return err } defer rows.Close() - var ( - user string - total_connections int64 - concurrent_connections int64 - connected_time int64 - busy_time int64 - cpu_time int64 - bytes_received int64 - bytes_sent int64 - binlog_bytes_written int64 - rows_fetched int64 - rows_updated int64 - table_rows_read int64 - select_commands int64 - update_commands int64 - other_commands int64 - commit_transactions int64 - rollback_transactions int64 - denied_connections int64 - lost_connections int64 - access_denied int64 - empty_queries int64 - total_ssl_connections int64 - count uint32 - ) + + cols, err := convertColumns(rows.Columns()) + if err != nil { + return err + } + + read, err := getColSlice(len(cols)) + if err != nil { + return err + } + + if len(read) != len(cols) { + return fmt.Errorf("Results (%d) don't match up with columns (%d)!", len(read), len(cols)) + } servtag := getDSNTag(serv) for rows.Next() { - err = rows.Scan(&user, &total_connections, &concurrent_connections, - &connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, - &rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, - &commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, - &empty_queries, &total_ssl_connections, &count, - ) + err = rows.Scan(read...) if err != nil { return err } - tags := map[string]string{"server": servtag, "user": user} - fields := map[string]interface{}{ + tags := map[string]string{"server": servtag, "user": *read[0].(*string)} + fields := map[string]interface{}{} - "total_connections": total_connections, - "concurrent_connections": concurrent_connections, - "connected_time": connected_time, - "busy_time": busy_time, - "cpu_time": cpu_time, - "bytes_received": bytes_received, - "bytes_sent": bytes_sent, - "binlog_bytes_written": binlog_bytes_written, - "rows_fetched": rows_fetched, - "rows_updated": rows_updated, - "table_rows_read": table_rows_read, - "select_commands": select_commands, - "update_commands": update_commands, - "other_commands": other_commands, - "commit_transactions": commit_transactions, - "rollback_transactions": rollback_transactions, - "denied_connections": denied_connections, - "lost_connections": lost_connections, - "access_denied": access_denied, - "empty_queries": empty_queries, - "total_ssl_connections": total_ssl_connections, + for i := range cols { + if i == 0 { + continue // skip "user" + } + switch v := read[i].(type) { + case *bool: + fields[cols[i]] = *v + case *int: + fields[cols[i]] = *v + case *int8: + fields[cols[i]] = *v + case *int16: + fields[cols[i]] = *v + case *int32: + fields[cols[i]] = *v + case *int64: + fields[cols[i]] = *v + case *uint: + fields[cols[i]] = *v + case *uint8: + fields[cols[i]] = *v + case *uint16: + fields[cols[i]] = *v + case *uint32: + fields[cols[i]] = *v + case *uint64: + fields[cols[i]] = *v + case *uintptr: + fields[cols[i]] = *v + case *float32: + fields[cols[i]] = *v + case *float64: + fields[cols[i]] = *v + case *string: + fields[cols[i]] = *v + default: + return fmt.Errorf("Unknown column type - %T", v) + } } acc.AddFields("mysql_user_stats", fields, tags) } return nil } +// convertColumns converts selected column names to lowercase. +func convertColumns(s []string, e error) ([]string, error) { + if e != nil { + return nil, e + } + d := make([]string, len(s)) + + for i := range s { + d[i] = strings.ToLower(s[i]) + } + return d, nil +} + +// getColSlice returns an in interface slice that can be used in the row.Scan(). +func getColSlice(l int) ([]interface{}, error) { + // list of all possible column names + var ( + user string + total_connections int64 + concurrent_connections int64 + connected_time int64 + busy_time int64 + cpu_time int64 + bytes_received int64 + bytes_sent int64 + binlog_bytes_written int64 + rows_read int64 + rows_sent int64 + rows_deleted int64 + rows_inserted int64 + rows_updated int64 + select_commands int64 + update_commands int64 + other_commands int64 + commit_transactions int64 + rollback_transactions int64 + denied_connections int64 + lost_connections int64 + access_denied int64 + empty_queries int64 + total_ssl_connections int64 + max_statement_time_exceeded int64 + // maria specific + fbusy_time float64 + fcpu_time float64 + // percona specific + client string + rows_fetched int64 + table_rows_read int64 + ) + + switch l { + case 23: // maria5 + return []interface{}{ + &user, + &total_connections, + &concurrent_connections, + &connected_time, + &fbusy_time, + &fcpu_time, + &bytes_received, + &bytes_sent, + &binlog_bytes_written, + &rows_read, + &rows_sent, + &rows_deleted, + &rows_inserted, + &rows_updated, + &select_commands, + &update_commands, + &other_commands, + &commit_transactions, + &rollback_transactions, + &denied_connections, + &lost_connections, + &access_denied, + &empty_queries, + }, nil + case 25: // maria10 + return []interface{}{ + &user, + &total_connections, + &concurrent_connections, + &connected_time, + &fbusy_time, + &fcpu_time, + &bytes_received, + &bytes_sent, + &binlog_bytes_written, + &rows_read, + &rows_sent, + &rows_deleted, + &rows_inserted, + &rows_updated, + &select_commands, + &update_commands, + &other_commands, + &commit_transactions, + &rollback_transactions, + &denied_connections, + &lost_connections, + &access_denied, + &empty_queries, + &total_ssl_connections, + &max_statement_time_exceeded, + }, nil + case 21: // percona + return []interface{}{ + &client, + &total_connections, + &concurrent_connections, + &connected_time, + &busy_time, + &cpu_time, + &bytes_received, + &bytes_sent, + &binlog_bytes_written, + &rows_fetched, + &rows_updated, + &table_rows_read, + &select_commands, + &update_commands, + &other_commands, + &commit_transactions, + &rollback_transactions, + &denied_connections, + &lost_connections, + &access_denied, + &empty_queries, + }, nil + } + + return nil, fmt.Errorf("Not Supported - %d columns", l) +} + // gatherPerfTableIOWaits can be used to get total count and time // of I/O wait event for each table and process func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {