Skip to content

Commit

Permalink
Merge pull request #3 from logrhythm/pubsubbeat-heartbeat
Browse files Browse the repository at this point in the history
heartbeat added
  • Loading branch information
rajanjoshigl authored Mar 21, 2020
2 parents a5e1c8f + 4d03cec commit bab4564
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 1 deletion.
19 changes: 18 additions & 1 deletion beater/pubsubbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"cloud.google.com/go/pubsub"
"github.com/logrhythm/pubsubbeat/config"
"github.com/logrhythm/pubsubbeat/heartbeat"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -48,9 +49,13 @@ type Pubsubbeat struct {
pubsubClient *pubsub.Client
subscription *pubsub.Subscription
logger *logp.Logger
StopChan chan struct{}
}

const cycleTime = 10 //will be in seconds
const (
cycleTime = 10 //will be in seconds
ServiceName = "pubsubbeat"
)

var (
receivedLogsInCycle int64
Expand Down Expand Up @@ -109,6 +114,17 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error {
bt.pubsubClient.Close()
}()

// Self-reporting heartbeat
bt.StopChan = make(chan struct{})
hb := heartbeat.NewHeartbeatConfig(bt.config.HeartbeatInterval, bt.config.HeartbeatDisabled)
heartbeater, err := hb.CreateEnabled(bt.StopChan, ServiceName)
if err != nil {
logp.Info("Error while creating new heartbeat object: %v", err)
}
if heartbeater != nil {
heartbeater.Start(bt.StopChan, bt.client.Publish)
}

go cycleRoutine(time.Duration(cycleTime))

err = bt.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
Expand Down Expand Up @@ -185,6 +201,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error {
func (bt *Pubsubbeat) Stop() {
bt.client.Close()
close(stopCh)
bt.StopChan <- struct{}{}
close(bt.done)
}

Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"

"github.com/elastic/beats/libbeat/common"
hb "github.com/logrhythm/pubsubbeat/heartbeat"
)

type Config struct {
Expand All @@ -42,12 +43,16 @@ type Config struct {
FieldsTimestampName string `config:"fields_timestamp_name"`
FieldsTimestampFormat string `config:"fields_timestamp_format"`
}
HeartbeatInterval time.Duration `config:"heartbeatinterval"`
HeartbeatDisabled bool `config:"heartbeatdisabled"`
}

func GetDefaultConfig() Config {
config := Config{}
config.Subscription.Create = true
config.Json.FieldsTimestampName = "@timestamp"
config.HeartbeatInterval = hb.IntervalValue
config.HeartbeatDisabled = false
return config
}

Expand Down
45 changes: 45 additions & 0 deletions heartbeat/enable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package heartbeat

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/logp"
)

// Config is a structure for heartbeat
type Config struct {
Interval time.Duration
Disabled bool
}

// IntervalValue is a default value for heartbeat interval
var IntervalValue = 5 * time.Minute

// NewHeartbeatConfig is a constructor to return the object of heartbeatConfig structure
func NewHeartbeatConfig(interval time.Duration, disabled bool) *Config {
return &Config{
Interval: interval,
Disabled: disabled,
}
}

// CreateEnabled will create all miscellaneous components
func (config *Config) CreateEnabled(doneChan chan struct{}, serviceName string) (*StatusBeater, error) {
if config == nil {
return nil, fmt.Errorf("no heartbeat specified. To disable, specify 'disabled: true' in the heartbeat configuration")
}

if config.Disabled {
// Customer has explicitly disabled heart beating
return nil, nil
}

if config.Interval <= 0 {
// Shouldn't happen in regular code path because of our defaults / validation
logp.Warn("Heartbeat interval can not be less than zero. Setting to default 5 minute")
config.Interval = IntervalValue
}

return NewStatusBeater(serviceName, config.Interval, doneChan), nil
}
122 changes: 122 additions & 0 deletions heartbeat/status_beater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package heartbeat

import (
"encoding/json"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/golang/protobuf/ptypes/timestamp"
)

// Heartbeat is a structure for heartbeat
type Heartbeat struct {
// Service name
ServiceName string `json:"service_name"`
// Current version of the service
ServiceVersion string `json:"service_version"`

Time timestamp.Timestamp `json:"time"`

Status Status `json:"status"`
}

const (
//ServiceStarted is a code for starting a particular service
ServiceStarted = 1
//ServiceRunning is a code for running instance a particular service
ServiceRunning = 2
//ServiceStopped is a code for stopping a particular service
ServiceStopped = 3
)

// Status is used for status of heartbeat1
type Status struct {
Code int64 `json:"code"`
Description string `json:"description"`
}

// IntervalFunc is a function that can trigger a timing event based on a duration
type IntervalFunc func() <-chan time.Time

// StatusBeater reports simple service information
type StatusBeater struct {
Name string
Version string

IntervalFunc IntervalFunc
doneChan chan struct{}
}

// Start will begin reporting heartbeats through the beats
func (sb *StatusBeater) Start(stopChan chan struct{}, publish func(event beat.Event)) {
go func() {
sb.Beat(ServiceStarted, "Service started", publish)
for {
select {
case <-sb.IntervalFunc():
sb.Beat(ServiceRunning, "Service is Running", publish)
case <-stopChan:
sb.Beat(ServiceStopped, "Service is Stopped", publish)
sb.doneChan <- struct{}{}
return
}
}
}()
}

// Beat will send a beat containing simple service status information
func (sb *StatusBeater) Beat(status int64, description string, publish func(event beat.Event)) {
now := time.Now().UnixNano()
msg := Heartbeat{
ServiceName: sb.Name,
ServiceVersion: sb.Version,
Time: timestamp.Timestamp{
Seconds: now / time.Nanosecond.Nanoseconds(),
},
Status: Status{
Code: status,
Description: description,
},
}
msgJSON, err := json.Marshal(msg)
if err != nil {
logp.Warn("internal heartbeat message json conversion failed %s", err)
return
}
sb.PublishEvent(msgJSON, publish)

}

// PublishEvent will publish passed Log
func (sb *StatusBeater) PublishEvent(logData []byte, publish func(event beat.Event)) {
event := beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"heartbeat": string(logData),
},
}
publish(event)
logp.Info("heartbeat sent")
}

// NewStatusBeater will return a new StatusBeater with the provided base information
func NewStatusBeater(serviceName string, interval time.Duration, doneChan chan struct{}) *StatusBeater {
return NewStatusBeaterWithFunc(
serviceName,
func() <-chan time.Time {
return time.After(interval)
},
doneChan,
)
}

// NewStatusBeaterWithFunc returns a new StatusBeater that uses the provided func as a trigger for sending beats
func NewStatusBeaterWithFunc(serviceName string, intervalFunc IntervalFunc, doneChan chan struct{}) *StatusBeater {
return &StatusBeater{
Name: serviceName,
IntervalFunc: intervalFunc,
doneChan: doneChan,
}
}
6 changes: 6 additions & 0 deletions pubsubbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ pubsubbeat:
# The Pub/Sub subscription name.
subscription.name: my-subscription

#time interval to check heartbeat
heartbeatinterval : 5s

#flag to check heartbeat status
heartbeatdisabled: false

# Attempt to create the subscription.
subscription.create: true # Defaults to true

Expand Down

0 comments on commit bab4564

Please sign in to comment.