diff --git a/beater/pubsubbeat.go b/beater/pubsubbeat.go index e11478679..a4e5a89ea 100644 --- a/beater/pubsubbeat.go +++ b/beater/pubsubbeat.go @@ -50,10 +50,15 @@ type Pubsubbeat struct { logger *logp.Logger } -var cycleTime int64 = 10 //will be in seconds -var receivedLogs int64 -var counterLock sync.RWMutex -var ch = make(chan bool, 1) +const cycleTime = 10 //will be in seconds + +var ( + receivedLogsInCycle int64 + counterLock sync.RWMutex + logsReceived int64 +) + +var stopCh = make(chan struct{}) func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config, err := config.GetAndValidateConfig(cfg) @@ -163,7 +168,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { }) counterLock.Lock() - receivedLogs = receivedLogs + 1 + receivedLogsInCycle = receivedLogsInCycle + 1 counterLock.Unlock() // TODO: Evaluate using AckHandler. @@ -179,7 +184,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error { func (bt *Pubsubbeat) Stop() { bt.client.Close() - ch <- true + close(stopCh) close(bt.done) } @@ -249,21 +254,22 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub func cycleRoutine(n time.Duration) { for { select { - case <-ch: + case <-stopCh: break default: } time.Sleep(n * time.Second) counterLock.Lock() - logsReceived := receivedLogs + logsReceived = logsReceived + receivedLogsInCycle var recordsPerSecond int64 - if logsReceived > 0 { - recordsPerSecond = logsReceived / int64(cycleTime) + if receivedLogsInCycle > 0 { + recordsPerSecond = receivedLogsInCycle / int64(cycleTime) } - receivedLogs = 0 + logp.Info("Total number of logs received in current cycle : %d", receivedLogsInCycle) + receivedLogsInCycle = 0 counterLock.Unlock() logp.Info("Total number of logs received : %d", logsReceived) - logp.Info("Events Flush Rate: %v per second", recordsPerSecond) + logp.Info("Events Flush Rate: %v messages per second", recordsPerSecond) } }