Skip to content

Commit

Permalink
mps done
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajan Joshi committed Mar 20, 2020
1 parent c23a4b4 commit a5e1c8f
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions beater/pubsubbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,15 @@ type Pubsubbeat struct {
logger *logp.Logger
}

var cycleTime int64 = 10 //will be in seconds
var receivedLogs int64
var counterLock sync.RWMutex
var ch = make(chan bool, 1)
const cycleTime = 10 //will be in seconds

var (
receivedLogsInCycle int64
counterLock sync.RWMutex
logsReceived int64
)

var stopCh = make(chan struct{})

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config, err := config.GetAndValidateConfig(cfg)
Expand Down Expand Up @@ -163,7 +168,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error {
})

counterLock.Lock()
receivedLogs = receivedLogs + 1
receivedLogsInCycle = receivedLogsInCycle + 1
counterLock.Unlock()

// TODO: Evaluate using AckHandler.
Expand All @@ -179,7 +184,7 @@ func (bt *Pubsubbeat) Run(b *beat.Beat) error {

func (bt *Pubsubbeat) Stop() {
bt.client.Close()
ch <- true
close(stopCh)
close(bt.done)
}

Expand Down Expand Up @@ -249,21 +254,22 @@ func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pub
func cycleRoutine(n time.Duration) {
for {
select {
case <-ch:
case <-stopCh:
break
default:
}

time.Sleep(n * time.Second)
counterLock.Lock()
logsReceived := receivedLogs
logsReceived = logsReceived + receivedLogsInCycle
var recordsPerSecond int64
if logsReceived > 0 {
recordsPerSecond = logsReceived / int64(cycleTime)
if receivedLogsInCycle > 0 {
recordsPerSecond = receivedLogsInCycle / int64(cycleTime)
}
receivedLogs = 0
logp.Info("Total number of logs received in current cycle : %d", receivedLogsInCycle)
receivedLogsInCycle = 0
counterLock.Unlock()
logp.Info("Total number of logs received : %d", logsReceived)
logp.Info("Events Flush Rate: %v per second", recordsPerSecond)
logp.Info("Events Flush Rate: %v messages per second", recordsPerSecond)
}
}

0 comments on commit a5e1c8f

Please sign in to comment.