-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle variations in the user_statistics collecting #4306
Changes from 1 commit
d639493
b5bac9f
b534e15
921262f
aa5ded1
bf14c86
0c5209a
33ca597
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
MariaDB has two different table definitions: - https://mariadb.com/kb/en/library/information-schema-user_statistics-table/ Percona has it's own: - https://www.percona.com/doc/percona-server/LATEST/diagnostics/user_stats.html#USER_STATISTICS MySQL doesn't have the table, but there may be a patch: - https://www.byte.nl/blog/enhancing-mysql-with-user-statistics Resolves #2910 Closes #3382
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -282,9 +282,9 @@ const ( | |
GROUP BY command,state | ||
ORDER BY null` | ||
infoSchemaUserStatisticsQuery = ` | ||
SELECT *,count(*) | ||
SELECT * | ||
FROM information_schema.user_statistics | ||
GROUP BY user` | ||
GROUP BY user` | ||
infoSchemaAutoIncQuery = ` | ||
SELECT table_schema, table_name, column_name, auto_increment, | ||
CAST(pow(2, case data_type | ||
|
@@ -790,72 +790,13 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum | |
|
||
// gather connection metrics from user_statistics for each user | ||
if m.GatherUserStatistics { | ||
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user") | ||
err = m.GatherUserStatisticsStatuses(db, serv, acc) | ||
if err != nil { | ||
log.Printf("E! MySQL Error gathering user stats: %s", err) | ||
} else { | ||
for conn_rows.Next() { | ||
var user string | ||
var total_connections int64 | ||
var concurrent_connections int64 | ||
var connected_time int64 | ||
var busy_time int64 | ||
var cpu_time int64 | ||
var bytes_received int64 | ||
var bytes_sent int64 | ||
var binlog_bytes_written int64 | ||
var rows_fetched int64 | ||
var rows_updated int64 | ||
var table_rows_read int64 | ||
var select_commands int64 | ||
var update_commands int64 | ||
var other_commands int64 | ||
var commit_transactions int64 | ||
var rollback_transactions int64 | ||
var denied_connections int64 | ||
var lost_connections int64 | ||
var access_denied int64 | ||
var empty_queries int64 | ||
var total_ssl_connections int64 | ||
|
||
err = conn_rows.Scan(&user, &total_connections, &concurrent_connections, | ||
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, | ||
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, | ||
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, | ||
&empty_queries, &total_ssl_connections, | ||
) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
tags := map[string]string{"server": servtag, "user": user} | ||
fields := map[string]interface{}{ | ||
"total_connections": total_connections, | ||
"concurrent_connections": concurrent_connections, | ||
"connected_time": connected_time, | ||
"busy_time": busy_time, | ||
"cpu_time": cpu_time, | ||
"bytes_received": bytes_received, | ||
"bytes_sent": bytes_sent, | ||
"binlog_bytes_written": binlog_bytes_written, | ||
"rows_fetched": rows_fetched, | ||
"rows_updated": rows_updated, | ||
"table_rows_read": table_rows_read, | ||
"select_commands": select_commands, | ||
"update_commands": update_commands, | ||
"other_commands": other_commands, | ||
"commit_transactions": commit_transactions, | ||
"rollback_transactions": rollback_transactions, | ||
"denied_connections": denied_connections, | ||
"lost_connections": lost_connections, | ||
"access_denied": access_denied, | ||
"empty_queries": empty_queries, | ||
"total_ssl_connections": total_ssl_connections, | ||
} | ||
|
||
acc.AddFields("mysql_user_stats", fields, tags) | ||
// disable collecting if table is not found (suppresses repeat errors) | ||
if strings.Contains(err.Error(), "nknown table 'user_statistics'") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I generally leave out capital letters when matching error strings in case the error changes. ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, personally I would probably just ToLower it, but ideally this type of error matching will be uncommon. I did find an interesting link off of http://go-database-sql.org/errors.html#identifying-specific-database-errors to a package of error codes for mysql, this might be helpful, though I'm unsure if these work on all mysql flavors. |
||
m.GatherUserStatistics = false | ||
} | ||
log.Printf("E! MySQL Error gathering user stats: %s", err) | ||
} | ||
} | ||
|
||
|
@@ -920,74 +861,210 @@ func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegr | |
return err | ||
} | ||
defer rows.Close() | ||
var ( | ||
user string | ||
total_connections int64 | ||
concurrent_connections int64 | ||
connected_time int64 | ||
busy_time int64 | ||
cpu_time int64 | ||
bytes_received int64 | ||
bytes_sent int64 | ||
binlog_bytes_written int64 | ||
rows_fetched int64 | ||
rows_updated int64 | ||
table_rows_read int64 | ||
select_commands int64 | ||
update_commands int64 | ||
other_commands int64 | ||
commit_transactions int64 | ||
rollback_transactions int64 | ||
denied_connections int64 | ||
lost_connections int64 | ||
access_denied int64 | ||
empty_queries int64 | ||
total_ssl_connections int64 | ||
count uint32 | ||
) | ||
|
||
cols, err := convertColumns(rows.Columns()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just an observation. I never knew you could pass multiple arguments like this, interesting. I'm not sure I like how it becomes the outer functions job to deal with an upstream error, but I like how it can compress the error handling needed in this function. |
||
if err != nil { | ||
return err | ||
} | ||
|
||
read, err := getColSlice(len(cols)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if len(read) != len(cols) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would probably remove this check, since in theory it is impossible unless there is a programming error and it should. be picked up by the rows.Scan( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return fmt.Errorf("Results (%d) don't match up with columns (%d)!", len(read), len(cols)) | ||
} | ||
|
||
servtag := getDSNTag(serv) | ||
for rows.Next() { | ||
err = rows.Scan(&user, &total_connections, &concurrent_connections, | ||
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written, | ||
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands, | ||
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied, | ||
&empty_queries, &total_ssl_connections, &count, | ||
) | ||
err = rows.Scan(read...) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
tags := map[string]string{"server": servtag, "user": user} | ||
fields := map[string]interface{}{ | ||
tags := map[string]string{"server": servtag, "user": *read[0].(*string)} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. User is not always index 0 (percona) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll make it more robust, but in all the docs I checked it would be index 0. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about in this list of columns? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alias for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AH! I was referencing |
||
fields := map[string]interface{}{} | ||
|
||
"total_connections": total_connections, | ||
"concurrent_connections": concurrent_connections, | ||
"connected_time": connected_time, | ||
"busy_time": busy_time, | ||
"cpu_time": cpu_time, | ||
"bytes_received": bytes_received, | ||
"bytes_sent": bytes_sent, | ||
"binlog_bytes_written": binlog_bytes_written, | ||
"rows_fetched": rows_fetched, | ||
"rows_updated": rows_updated, | ||
"table_rows_read": table_rows_read, | ||
"select_commands": select_commands, | ||
"update_commands": update_commands, | ||
"other_commands": other_commands, | ||
"commit_transactions": commit_transactions, | ||
"rollback_transactions": rollback_transactions, | ||
"denied_connections": denied_connections, | ||
"lost_connections": lost_connections, | ||
"access_denied": access_denied, | ||
"empty_queries": empty_queries, | ||
"total_ssl_connections": total_ssl_connections, | ||
for i := range cols { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, I think leaving it is best. I'll still need the iterator for the |
||
if i == 0 { | ||
continue // skip "user" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Percona index 0 comment applies here. |
||
} | ||
switch v := read[i].(type) { | ||
case *bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only include expected types and the default case can handle anything else. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Is it stylistically ok to leave out the default case since it is immediately followed by the function's return? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I usually leave the default in to catch any programming mistakes. |
||
fields[cols[i]] = *v | ||
case *int: | ||
fields[cols[i]] = *v | ||
case *int8: | ||
fields[cols[i]] = *v | ||
case *int16: | ||
fields[cols[i]] = *v | ||
case *int32: | ||
fields[cols[i]] = *v | ||
case *int64: | ||
fields[cols[i]] = *v | ||
case *uint: | ||
fields[cols[i]] = *v | ||
case *uint8: | ||
fields[cols[i]] = *v | ||
case *uint16: | ||
fields[cols[i]] = *v | ||
case *uint32: | ||
fields[cols[i]] = *v | ||
case *uint64: | ||
fields[cols[i]] = *v | ||
case *uintptr: | ||
fields[cols[i]] = *v | ||
case *float32: | ||
fields[cols[i]] = *v | ||
case *float64: | ||
fields[cols[i]] = *v | ||
case *string: | ||
fields[cols[i]] = *v | ||
default: | ||
return fmt.Errorf("Unknown column type - %T", v) | ||
} | ||
} | ||
acc.AddFields("mysql_user_stats", fields, tags) | ||
} | ||
return nil | ||
} | ||
|
||
// convertColumns converts selected column names to lowercase. | ||
func convertColumns(s []string, e error) ([]string, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great idea |
||
if e != nil { | ||
return nil, e | ||
} | ||
d := make([]string, len(s)) | ||
|
||
for i := range s { | ||
d[i] = strings.ToLower(s[i]) | ||
} | ||
return d, nil | ||
} | ||
|
||
// getColSlice returns an in interface slice that can be used in the row.Scan(). | ||
func getColSlice(l int) ([]interface{}, error) { | ||
// list of all possible column names | ||
var ( | ||
user string | ||
total_connections int64 | ||
concurrent_connections int64 | ||
connected_time int64 | ||
busy_time int64 | ||
cpu_time int64 | ||
bytes_received int64 | ||
bytes_sent int64 | ||
binlog_bytes_written int64 | ||
rows_read int64 | ||
rows_sent int64 | ||
rows_deleted int64 | ||
rows_inserted int64 | ||
rows_updated int64 | ||
select_commands int64 | ||
update_commands int64 | ||
other_commands int64 | ||
commit_transactions int64 | ||
rollback_transactions int64 | ||
denied_connections int64 | ||
lost_connections int64 | ||
access_denied int64 | ||
empty_queries int64 | ||
total_ssl_connections int64 | ||
max_statement_time_exceeded int64 | ||
// maria specific | ||
fbusy_time float64 | ||
fcpu_time float64 | ||
// percona specific | ||
client string | ||
rows_fetched int64 | ||
table_rows_read int64 | ||
) | ||
|
||
switch l { | ||
case 23: // maria5 | ||
return []interface{}{ | ||
&user, | ||
&total_connections, | ||
&concurrent_connections, | ||
&connected_time, | ||
&fbusy_time, | ||
&fcpu_time, | ||
&bytes_received, | ||
&bytes_sent, | ||
&binlog_bytes_written, | ||
&rows_read, | ||
&rows_sent, | ||
&rows_deleted, | ||
&rows_inserted, | ||
&rows_updated, | ||
&select_commands, | ||
&update_commands, | ||
&other_commands, | ||
&commit_transactions, | ||
&rollback_transactions, | ||
&denied_connections, | ||
&lost_connections, | ||
&access_denied, | ||
&empty_queries, | ||
}, nil | ||
case 25: // maria10 | ||
return []interface{}{ | ||
&user, | ||
&total_connections, | ||
&concurrent_connections, | ||
&connected_time, | ||
&fbusy_time, | ||
&fcpu_time, | ||
&bytes_received, | ||
&bytes_sent, | ||
&binlog_bytes_written, | ||
&rows_read, | ||
&rows_sent, | ||
&rows_deleted, | ||
&rows_inserted, | ||
&rows_updated, | ||
&select_commands, | ||
&update_commands, | ||
&other_commands, | ||
&commit_transactions, | ||
&rollback_transactions, | ||
&denied_connections, | ||
&lost_connections, | ||
&access_denied, | ||
&empty_queries, | ||
&total_ssl_connections, | ||
&max_statement_time_exceeded, | ||
}, nil | ||
case 21: // percona | ||
return []interface{}{ | ||
&client, | ||
&total_connections, | ||
&concurrent_connections, | ||
&connected_time, | ||
&busy_time, | ||
&cpu_time, | ||
&bytes_received, | ||
&bytes_sent, | ||
&binlog_bytes_written, | ||
&rows_fetched, | ||
&rows_updated, | ||
&table_rows_read, | ||
&select_commands, | ||
&update_commands, | ||
&other_commands, | ||
&commit_transactions, | ||
&rollback_transactions, | ||
&denied_connections, | ||
&lost_connections, | ||
&access_denied, | ||
&empty_queries, | ||
}, nil | ||
} | ||
|
||
return nil, fmt.Errorf("Not Supported - %d columns", l) | ||
} | ||
|
||
// gatherPerfTableIOWaits can be used to get total count and time | ||
// of I/O wait event for each table and process | ||
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this result in this function being called twice per interval, once from
gatherServer
and once here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It didn't appear to in testing (unless I missed it), but all the logic was already happening at this location anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about L455 though?