-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
Copy pathpostgresql.go
166 lines (134 loc) · 3.69 KB
/
postgresql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
//go:generate ../../../tools/readme_config_includer/generator
package postgresql
import (
"bytes"
"database/sql"
_ "embed"
"fmt"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/postgresql"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
var ignoredColumns = map[string]bool{"stats_reset": true}
type Postgresql struct {
Databases []string `toml:"databases"`
IgnoredDatabases []string `toml:"ignored_databases"`
PreparedStatements bool `toml:"prepared_statements"`
postgresql.Config
service *postgresql.Service
}
func (*Postgresql) SampleConfig() string {
return sampleConfig
}
func (p *Postgresql) Init() error {
p.IsPgBouncer = !p.PreparedStatements
service, err := p.Config.CreateService()
if err != nil {
return err
}
p.service = service
return nil
}
func (p *Postgresql) Start(_ telegraf.Accumulator) error {
return p.service.Start()
}
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
var query string
if len(p.Databases) == 0 && len(p.IgnoredDatabases) == 0 {
query = `SELECT * FROM pg_stat_database`
} else if len(p.IgnoredDatabases) != 0 {
query = fmt.Sprintf(`SELECT * FROM pg_stat_database WHERE datname NOT IN ('%s')`,
strings.Join(p.IgnoredDatabases, "','"))
} else {
query = fmt.Sprintf(`SELECT * FROM pg_stat_database WHERE datname IN ('%s')`,
strings.Join(p.Databases, "','"))
}
rows, err := p.service.DB.Query(query)
if err != nil {
return err
}
defer rows.Close()
// grab the column information from the result
columns, err := rows.Columns()
if err != nil {
return err
}
for rows.Next() {
err = p.accRow(rows, acc, columns)
if err != nil {
return err
}
}
query = `SELECT * FROM pg_stat_bgwriter`
bgWriterRow, err := p.service.DB.Query(query)
if err != nil {
return err
}
defer bgWriterRow.Close()
// grab the column information from the result
if columns, err = bgWriterRow.Columns(); err != nil {
return err
}
for bgWriterRow.Next() {
if err := p.accRow(bgWriterRow, acc, columns); err != nil {
return err
}
}
return bgWriterRow.Err()
}
func (p *Postgresql) Stop() {
p.service.Stop()
}
func (p *Postgresql) accRow(row *sql.Rows, acc telegraf.Accumulator, columns []string) error {
var dbname bytes.Buffer
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})
for _, column := range columns {
columnMap[column] = new(interface{})
}
columnVars := make([]interface{}, 0, len(columnMap))
// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[columns[i]])
}
// deconstruct array of variables and send to Scan
if err := row.Scan(columnVars...); err != nil {
return err
}
if columnMap["datname"] != nil {
// extract the database name from the column map
if dbNameStr, ok := (*columnMap["datname"]).(string); ok {
dbname.WriteString(dbNameStr)
} else {
// PG 12 adds tracking of global objects to pg_stat_database
dbname.WriteString("postgres_global")
}
} else {
dbname.WriteString(p.service.ConnectionDatabase)
}
tagAddress := p.service.SanitizedAddress
tags := map[string]string{"server": tagAddress, "db": dbname.String()}
fields := make(map[string]interface{})
for col, val := range columnMap {
_, ignore := ignoredColumns[col]
if !ignore {
fields[col] = *val
}
}
acc.AddFields("postgresql", fields, tags)
return nil
}
func init() {
inputs.Add("postgresql", func() telegraf.Input {
return &Postgresql{
Config: postgresql.Config{
MaxIdle: 1,
MaxOpen: 1,
},
PreparedStatements: true,
}
})
}