Skip to content

Commit

Permalink
Create monitor storage every monitor cycle
Browse files Browse the repository at this point in the history
This functionality is idempotent and is storage has been successfully
created, no attempt will be made to re-create it. This allows the system
to keep trying every cycle, but stop when successful.
  • Loading branch information
otoolep committed Oct 5, 2015
1 parent 22a3850 commit 62fab49
Showing 1 changed file with 36 additions and 22 deletions.
58 changes: 36 additions & 22 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Monitor struct {

diagRegistrations map[string]DiagsClient

storeCreated bool
storeEnabled bool
storeDatabase string
storeRetentionPolicy string
Expand Down Expand Up @@ -294,29 +295,14 @@ func (m *Monitor) Diagnostics() (map[string]*Diagnostic, error) {
return diags, nil
}

// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
defer m.wg.Done()
m.Logger.Printf("Storing statistics in database '%s' retention policy '%s', at interval %s",
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)

if err := m.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
m.Logger.Printf("failed to detect a cluster leader, terminating storage: %s", err.Error())
// createInternalStorage ensures the internal storage has been created.
func (m *Monitor) createInternalStorage() {
if m.storeCreated {
return
}

// Get cluster-level metadata. Nothing different is going to happen if errors occur.
clusterID, _ := m.MetaStore.ClusterID()
nodeID := m.MetaStore.NodeID()
hostname, _ := os.Hostname()
clusterTags := map[string]string{
"clusterID": fmt.Sprintf("%d", clusterID),
"nodeID": fmt.Sprintf("%d", nodeID),
"hostname": hostname,
}

if _, err := m.MetaStore.CreateDatabaseIfNotExists(m.storeDatabase); err != nil {
m.Logger.Printf("failed to create database '%s', terminating storage: %s",
m.Logger.Printf("failed to create database '%s', failed to create storage: %s",
m.storeDatabase, err.Error())
return
}
Expand All @@ -325,27 +311,55 @@ func (m *Monitor) storeStatistics() {
rpi.Duration = MonitorRetentionPolicyDuration
rpi.ReplicaN = 1
if _, err := m.MetaStore.CreateRetentionPolicyIfNotExists(m.storeDatabase, rpi); err != nil {
m.Logger.Printf("failed to create retention policy '%s', terminating storage: %s",
m.Logger.Printf("failed to create retention policy '%s', failed to create internal storage: %s",
rpi.Name, err.Error())
return
}

if err := m.MetaStore.SetDefaultRetentionPolicy(m.storeDatabase, rpi.Name); err != nil {
m.Logger.Printf("failed to set default retention policy on '%s', terminating storage: %s",
m.Logger.Printf("failed to set default retention policy on '%s', failed to create internal storage: %s",
m.storeDatabase, err.Error())
return
}

if err := m.MetaStore.DropRetentionPolicy(m.storeDatabase, "default"); err != nil && err != meta.ErrRetentionPolicyNotFound {
m.Logger.Printf("failed to delete retention policy 'default', terminating storage: %s", err.Error())
m.Logger.Printf("failed to delete retention policy 'default', failed to created internal storage: %s", err.Error())
return
}

// Mark storage creation complete.
m.storeCreated = true
}

// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
defer m.wg.Done()
m.Logger.Printf("Storing statistics in database '%s' retention policy '%s', at interval %s",
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)

if err := m.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
m.Logger.Printf("failed to detect a cluster leader, terminating storage: %s", err.Error())
return
}

// Get cluster-level metadata. Nothing different is going to happen if errors occur.
clusterID, _ := m.MetaStore.ClusterID()
nodeID := m.MetaStore.NodeID()
hostname, _ := os.Hostname()
clusterTags := map[string]string{
"clusterID": fmt.Sprintf("%d", clusterID),
"nodeID": fmt.Sprintf("%d", nodeID),
"hostname": hostname,
}

tick := time.NewTicker(m.storeInterval)
defer tick.Stop()
for {
select {

case <-tick.C:
m.createInternalStorage()

stats, err := m.Statistics(clusterTags)
if err != nil {
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
Expand Down

0 comments on commit 62fab49

Please sign in to comment.