-
Notifications
You must be signed in to change notification settings - Fork 365
/
Copy pathmongostat.go
428 lines (379 loc) · 12.6 KB
/
mongostat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
// Package mongostat provides an overview of the status of a currently running mongod or mongos instance.
package mongostat
import (
"context"
"fmt"
"net/url"
"strings"
"sync"
"time"
"github.com/mongodb/mongo-tools/common/db"
"github.com/mongodb/mongo-tools/common/log"
"github.com/mongodb/mongo-tools/common/options"
"github.com/mongodb/mongo-tools/mongostat/stat_consumer"
"github.com/mongodb/mongo-tools/mongostat/stat_consumer/line"
"github.com/mongodb/mongo-tools/mongostat/status"
"go.mongodb.org/mongo-driver/bson"
)
// MongoStat is a container for the user-specified options and
// internal cluster state used for running mongostat.
type MongoStat struct {
// Generic mongo tool options.
Options *options.ToolOptions
// Mongostat-specific output options.
StatOptions *StatOptions
// How long to sleep between printing the rows, and polling the server.
SleepInterval time.Duration
// New nodes can be "discovered" by any other node by sending a hostname
// on this channel.
Discovered chan string
// A map of hostname -> NodeMonitor for all the hosts that
// are being monitored.
Nodes map[string]*NodeMonitor
// ClusterMonitor to manage collecting and printing the stats from all nodes.
Cluster ClusterMonitor
// Mutex to handle safe concurrent adding to or looping over discovered nodes.
nodesLock sync.RWMutex
}
// ConfigShard holds a mapping for the format of shard hosts as they
// appear in the config.shards collection.
type ConfigShard struct {
Id string `bson:"_id"`
Host string `bson:"host"`
}
// NodeMonitor contains the connection pool for a single host and collects the
// mongostat data for that host on a regular interval.
type NodeMonitor struct {
host, alias string
sessionProvider *db.SessionProvider
// The time at which the node monitor last processed an update successfully.
LastUpdate time.Time
// The most recent error encountered when collecting stats for this node.
Err error
}
// SyncClusterMonitor is an implementation of ClusterMonitor that writes output
// synchronized with the timing of when the polling samples are collected.
// Only works with a single host at a time.
type SyncClusterMonitor struct {
// Channel to listen for incoming stat data
ReportChan chan *status.ServerStatus
// Channel to listen for incoming errors
ErrorChan chan *status.NodeError
// Creates and consumes StatLines using ServerStatuses
Consumer *stat_consumer.StatConsumer
}
// ClusterMonitor maintains an internal representation of a cluster's state,
// which can be refreshed with calls to Update(), and dumps output representing
// this internal state on an interval.
type ClusterMonitor interface {
// Monitor() triggers monitoring and dumping output to begin
// sleep is the interval to sleep between output dumps.
// returns an error if it fails, and nil when monitoring ends
Monitor(sleep time.Duration) error
// Update signals the ClusterMonitor implementation to refresh its internal
// state using the data contained in the provided ServerStatus.
Update(stat *status.ServerStatus, err *status.NodeError)
}
// AsyncClusterMonitor is an implementation of ClusterMonitor that writes output
// gotten from polling samples collected asynchronously from one or more servers.
type AsyncClusterMonitor struct {
Discover bool
// Channel to listen for incoming stat data
ReportChan chan *status.ServerStatus
// Channel to listen for incoming errors
ErrorChan chan *status.NodeError
// Map of hostname -> latest stat data for the host
LastStatLines map[string]*line.StatLine
// Mutex to protect access to LastStatLines
mapLock sync.RWMutex
// Creates and consumes StatLines using ServerStatuses
Consumer *stat_consumer.StatConsumer
}
// Update refreshes the internal state of the cluster monitor with the data
// in the StatLine. SyncClusterMonitor's implementation of Update blocks
// until it has written out its state, so that output is always dumped exactly
// once for each poll.
func (cluster *SyncClusterMonitor) Update(stat *status.ServerStatus, err *status.NodeError) {
if err != nil {
cluster.ErrorChan <- err
return
}
cluster.ReportChan <- stat
}
// Monitor waits for data on the cluster's report channel. Once new data comes
// in, it formats and then displays it to stdout.
func (cluster *SyncClusterMonitor) Monitor(_ time.Duration) error {
receivedData := false
for {
var statLine *line.StatLine
var ok bool
select {
case stat := <-cluster.ReportChan:
statLine, ok = cluster.Consumer.Update(stat)
if !ok {
continue
}
case err := <-cluster.ErrorChan:
if !receivedData {
return err
}
statLine = &line.StatLine{
Error: err,
Fields: map[string]string{"host": err.Host},
}
}
receivedData = true
if cluster.Consumer.FormatLines([]*line.StatLine{statLine}) {
return nil
}
}
}
// updateHostInfo updates the internal map with the given StatLine data.
// Safe for concurrent access.
func (cluster *AsyncClusterMonitor) updateHostInfo(stat *line.StatLine) {
cluster.mapLock.Lock()
defer cluster.mapLock.Unlock()
host := stat.Fields["host"]
cluster.LastStatLines[host] = stat
}
// printSnapshot formats and dumps the current state of all the stats collected.
// returns whether the program should now exit.
func (cluster *AsyncClusterMonitor) printSnapshot() bool {
cluster.mapLock.RLock()
defer cluster.mapLock.RUnlock()
lines := make([]*line.StatLine, 0, len(cluster.LastStatLines))
for _, stat := range cluster.LastStatLines {
lines = append(lines, stat)
}
if len(lines) == 0 {
return false
}
return cluster.Consumer.FormatLines(lines)
}
// Update sends a new StatLine on the cluster's report channel.
func (cluster *AsyncClusterMonitor) Update(stat *status.ServerStatus, err *status.NodeError) {
if err != nil {
cluster.ErrorChan <- err
return
}
cluster.ReportChan <- stat
}
// The Async implementation of Monitor starts the goroutines that listen for incoming stat data,
// and dump snapshots at a regular interval.
func (cluster *AsyncClusterMonitor) Monitor(sleep time.Duration) error {
select {
case stat := <-cluster.ReportChan:
cluster.Consumer.Update(stat)
case err := <-cluster.ErrorChan:
// error out if the first result is an error
return err
}
go func() {
for {
select {
case stat := <-cluster.ReportChan:
statLine, ok := cluster.Consumer.Update(stat)
if ok {
cluster.updateHostInfo(statLine)
}
case err := <-cluster.ErrorChan:
cluster.updateHostInfo(&line.StatLine{
Error: err,
Fields: map[string]string{"host": err.Host},
})
}
}
}()
ticker := time.NewTicker(sleep)
for range ticker.C {
if cluster.printSnapshot() {
break
}
}
return nil
}
// NewNodeMonitor copies the same connection settings from an instance of
// ToolOptions, but monitors fullHost.
func NewNodeMonitor(opts options.ToolOptions, fullHost string) (*NodeMonitor, error) {
optsCopy := opts
host, port := parseHostPort(fullHost)
optsCopy.Connection.Host = host
optsCopy.Connection.Port = port
uriCopy := *opts.URI
newCS, err := rewriteURI(uriCopy.ConnectionString, fullHost)
if err != nil {
return nil, err
}
uriCopy.ConnectionString = newCS
optsCopy.URI = &uriCopy
optsCopy.Direct = true
optsCopy.ConnString.Hosts = []string{fullHost}
sessionProvider, err := db.NewSessionProvider(optsCopy)
if err != nil {
return nil, err
}
return &NodeMonitor{
host: fullHost,
sessionProvider: sessionProvider,
LastUpdate: time.Now(),
Err: nil,
}, nil
}
func rewriteURI(oldURI, newAddress string) (string, error) {
u, err := url.Parse(oldURI)
if err != nil {
return "", err
}
u.Host = newAddress
return u.String(), nil
}
func (node *NodeMonitor) Disconnect() {
node.sessionProvider.Close()
}
// Report collects the stat info for a single node and sends found hostnames on
// the "discover" channel if checkShards is true.
func (node *NodeMonitor) Poll(
discover chan string,
checkShards bool,
) (*status.ServerStatus, error) {
stat := &status.ServerStatus{}
log.Logvf(log.DebugHigh, "getting session on server: %v", node.host)
session, err := node.sessionProvider.GetSession()
if err != nil {
log.Logvf(log.DebugLow, "got error getting session to server %v", node.host)
return nil, err
}
log.Logvf(log.DebugHigh, "got session on server: %v", node.host)
result := session.Database("admin").
RunCommand(context.TODO(), bson.D{{"serverStatus", 1}, {"recordStats", 0}})
err = result.Err()
if err != nil {
log.Logvf(log.DebugLow, "got error calling serverStatus against server %v", node.host)
return nil, err
}
tempBson, err := result.DecodeBytes()
if err != nil {
log.Logvf(log.Always, "Encountered error decoding serverStatus: %v\n", err)
return nil, fmt.Errorf("Error decoding serverStatus: %v\n", err)
}
err = bson.Unmarshal(tempBson, &stat)
if err != nil {
log.Logvf(log.Always, "Encountered error reading serverStatus: %v\n", err)
return nil, fmt.Errorf("Error reading serverStatus: %v\n", err)
}
// The flattened version is required by some lookup functions
statMap := make(map[string]interface{})
err = result.Decode(&statMap)
if err != nil {
return nil, fmt.Errorf("Error flattening serverStatus: %v\n", err)
}
stat.Flattened = status.Flatten(statMap)
node.Err = nil
stat.SampleTime = time.Now()
if stat.Repl != nil && discover != nil {
for _, host := range stat.Repl.Hosts {
discover <- host
}
for _, host := range stat.Repl.Passives {
discover <- host
}
}
node.alias = stat.Host
stat.Host = node.host
if discover != nil && stat != nil && status.IsMongos(stat) && checkShards {
log.Logvf(log.DebugLow, "checking config database to discover shards")
shardCursor, err := session.Database("config").
Collection("shards").
Find(context.TODO(), bson.M{}, nil)
if err != nil {
return nil, fmt.Errorf("error discovering shards: %v", err)
}
shard := ConfigShard{}
for shardCursor.Next(context.TODO()) {
if cursorErr := shardCursor.Decode(&shard); cursorErr != nil {
return nil, fmt.Errorf("error decoding shard info: %v", err)
}
shardHosts := strings.Split(shard.Host, ",")
for _, shardHost := range shardHosts {
discover <- shardHost
}
}
if closeErr := shardCursor.Close(context.TODO()); closeErr != nil {
return nil, fmt.Errorf("error closing shard discovery cursor: %v", err)
}
}
return stat, nil
}
// Watch continuously collects and processes stats for a single node on a
// regular interval. At each interval, it triggers the node's Poll function
// with the 'discover' channel.
func (node *NodeMonitor) Watch(sleep time.Duration, discover chan string, cluster ClusterMonitor) {
var cycle uint64
ticker := time.NewTicker(sleep)
for range ticker.C {
log.Logvf(log.DebugHigh, "polling server: %v", node.host)
stat, err := node.Poll(discover, cycle%10 == 0)
if stat != nil {
log.Logvf(log.DebugHigh, "successfully got statline from host: %v", node.host)
}
var nodeError *status.NodeError
if err != nil {
nodeError = status.NewNodeError(node.host, err)
}
cluster.Update(stat, nodeError)
cycle++
}
}
func parseHostPort(fullHostName string) (string, string) {
if colon := strings.LastIndex(fullHostName, ":"); colon >= 0 {
return fullHostName[0:colon], fullHostName[colon+1:]
}
return fullHostName, "27017"
}
// AddNewNode adds a new host name to be monitored and spawns the necessary
// goroutine to collect data from it.
func (mstat *MongoStat) AddNewNode(fullhost string) error {
mstat.nodesLock.Lock()
defer mstat.nodesLock.Unlock()
// Remove the 'shardXX/' prefix from the hostname, if applicable
pieces := strings.Split(fullhost, "/")
fullhost = pieces[len(pieces)-1]
if _, hasKey := mstat.Nodes[fullhost]; hasKey {
return nil
}
for _, node := range mstat.Nodes {
if node.alias == fullhost {
return nil
}
}
log.Logvf(log.DebugLow, "adding new host to monitoring: %v", fullhost)
// Create a new node monitor for this host
node, err := NewNodeMonitor(*mstat.Options, fullhost)
if err != nil {
return err
}
mstat.Nodes[fullhost] = node
go node.Watch(mstat.SleepInterval, mstat.Discovered, mstat.Cluster)
return nil
}
// Run is the top-level function that starts the monitoring
// and discovery goroutines.
func (mstat *MongoStat) Run() error {
if mstat.Discovered != nil {
go func() {
for {
newHost := <-mstat.Discovered
err := mstat.AddNewNode(newHost)
if err != nil {
log.Logvf(log.Always, "can't add discovered node %v: %v", newHost, err)
}
}
}()
}
return mstat.Cluster.Monitor(mstat.SleepInterval)
}