Skip to content

Commit

Permalink
Move statistics to separate module
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-majlis-s1 committed Nov 14, 2023
1 parent 481237d commit ad85b54
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 29 deletions.
42 changes: 23 additions & 19 deletions pkg/client/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,13 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
// isProcessingBuffers returns True if there are still some unprocessed buffers.
// False otherwise.
func (client *DataSetClient) isProcessingBuffers() bool {
return client.statistics.buffersEnqueued.Load() > (client.statistics.buffersProcessed.Load() + client.statistics.buffersDropped.Load() + client.statistics.buffersBroken.Load())
return client.statistics.BuffersEnqueued() > (client.statistics.BuffersProcessed() + client.statistics.BuffersDropped() + client.statistics.BuffersBroken())
}

// isProcessingEvents returns True if there are still some unprocessed events.
// False otherwise.
func (client *DataSetClient) isProcessingEvents() bool {
return client.statistics.eventsEnqueued.Load() > (client.statistics.eventsProcessed.Load() + client.statistics.eventsDropped.Load() + client.statistics.eventsBroken.Load())
return client.statistics.EventsEnqueued() > (client.statistics.EventsProcessed() + client.statistics.EventsDropped() + client.statistics.EventsBroken())
}

// Shutdown takes care of shutdown of client. It does following steps
Expand Down Expand Up @@ -273,7 +273,7 @@ func (client *DataSetClient) Shutdown() error {
// try (with timeout) to process (add into buffers) events,
retryNum := 0
expBackoff.Reset()
initialEventsDropped := client.statistics.eventsDropped.Load()
initialEventsDropped := client.statistics.EventsDropped()
for client.isProcessingEvents() {
// log statistics
client.logStatistics()
Expand All @@ -283,8 +283,8 @@ func (client *DataSetClient) Shutdown() error {
"Shutting down - processing events",
zap.Int("retryNum", retryNum),
zap.Duration("backoffDelay", backoffDelay),
zap.Uint64("eventsEnqueued", client.statistics.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.statistics.eventsProcessed.Load()),
zap.Uint64("eventsEnqueued", client.statistics.EventsEnqueued()),
zap.Uint64("eventsProcessed", client.statistics.EventsProcessed()),
zap.Duration("elapsedTime", time.Since(processingStart)),
zap.Duration("maxElapsedTime", maxElapsedTime),
)
Expand Down Expand Up @@ -325,7 +325,7 @@ func (client *DataSetClient) Shutdown() error {
// do wait (with timeout) for all buffers to be sent to the server
retryNum = 0
expBackoff.Reset()
initialBuffersDropped := client.statistics.buffersDropped.Load()
initialBuffersDropped := client.statistics.BuffersDropped()
for client.isProcessingBuffers() {
// log statistics
client.logStatistics()
Expand All @@ -335,9 +335,9 @@ func (client *DataSetClient) Shutdown() error {
"Shutting down - processing buffers",
zap.Int("retryNum", retryNum),
zap.Duration("backoffDelay", backoffDelay),
zap.Uint64("buffersEnqueued", client.statistics.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.statistics.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.statistics.buffersDropped.Load()),
zap.Uint64("buffersEnqueued", client.statistics.BuffersEnqueued()),
zap.Uint64("buffersProcessed", client.statistics.BuffersProcessed()),
zap.Uint64("buffersDropped", client.statistics.BuffersDropped()),
zap.Duration("elapsedTime", time.Since(processingStart)),
zap.Duration("maxElapsedTime", maxElapsedTime),
)
Expand All @@ -352,30 +352,30 @@ func (client *DataSetClient) Shutdown() error {
if client.isProcessingEvents() {
lastError = fmt.Errorf(
"not all events have been processed - %d",
client.statistics.eventsEnqueued.Load()-client.statistics.eventsProcessed.Load(),
client.statistics.EventsEnqueued()-client.statistics.EventsProcessed(),
)
client.Logger.Error(
"Shutting down - not all events have been processed",
zap.Uint64("eventsEnqueued", client.statistics.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.statistics.eventsProcessed.Load()),
zap.Uint64("eventsEnqueued", client.statistics.EventsEnqueued()),
zap.Uint64("eventsProcessed", client.statistics.EventsProcessed()),
)
}

if client.isProcessingBuffers() {
lastError = fmt.Errorf(
"not all buffers have been processed - %d",
client.statistics.buffersEnqueued.Load()-client.statistics.buffersProcessed.Load()-client.statistics.buffersDropped.Load(),
client.statistics.BuffersEnqueued()-client.statistics.BuffersProcessed()-client.statistics.BuffersDropped(),
)
client.Logger.Error(
"Shutting down - not all buffers have been processed",
zap.Int("retryNum", retryNum),
zap.Uint64("buffersEnqueued", client.statistics.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.statistics.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.statistics.buffersDropped.Load()),
zap.Uint64("buffersEnqueued", client.statistics.BuffersEnqueued()),
zap.Uint64("buffersProcessed", client.statistics.BuffersProcessed()),
zap.Uint64("buffersDropped", client.statistics.BuffersDropped()),
)
}

eventsDropped := client.statistics.eventsDropped.Load() - initialEventsDropped
eventsDropped := client.statistics.EventsDropped() - initialEventsDropped
if eventsDropped > 0 {
lastError = fmt.Errorf(
"some events were dropped during finishing - %d",
Expand All @@ -387,7 +387,7 @@ func (client *DataSetClient) Shutdown() error {
)
}

buffersDropped := client.statistics.buffersDropped.Load() - initialBuffersDropped
buffersDropped := client.statistics.BuffersDropped() - initialBuffersDropped
if buffersDropped > 0 {
lastError = fmt.Errorf(
"some buffers were dropped during finishing - %d",
Expand Down Expand Up @@ -437,6 +437,7 @@ func (client *DataSetClient) sendAddEventsBuffer(buf *buffer.Buffer) (*add_event
)...,
)
resp := &add_events.AddEventsResponse{}
client.statistics.PayloadSizeRecord(int64(len(payload)))

httpRequest, err := request.NewApiRequest(
http.MethodPost, client.addEventsEndpointUrl,
Expand All @@ -445,8 +446,11 @@ func (client *DataSetClient) sendAddEventsBuffer(buf *buffer.Buffer) (*add_event
return nil, len(payload), fmt.Errorf("cannot create request: %w", err)
}

apiCallStart := time.Now()
err = client.apiCall(httpRequest, resp)
client.statistics.bytesAPISent.Add(uint64(len(payload)))
apiCallEnd := time.Now()
client.statistics.ResponseTimeRecord(apiCallEnd.Sub(apiCallStart))
client.statistics.BytesAPISentAdd(uint64(len(payload)))

if strings.HasPrefix(resp.Status, "error") {
client.Logger.Error(
Expand Down
19 changes: 10 additions & 9 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ import (

"github.com/cenkalti/backoff/v4"

_ "net/http/pprof"

"github.com/scalyr/dataset-go/pkg/api/add_events"
"github.com/scalyr/dataset-go/pkg/buffer"
"github.com/scalyr/dataset-go/pkg/config"
"github.com/scalyr/dataset-go/pkg/server_host_config"

_ "net/http/pprof"
"github.com/scalyr/dataset-go/pkg/statistics"

"github.com/cskr/pubsub"
"github.com/google/uuid"
Expand Down Expand Up @@ -96,7 +97,7 @@ type DataSetClient struct {
addEventsEndpointUrl string
userAgent string
serverHost string
statistics *Statistics
statistics *statistics.Statistics
}

func NewClient(
Expand Down Expand Up @@ -153,7 +154,7 @@ func NewClient(
userAgent = userAgent + ";" + *userAgentSuffix
}

statictics, err := NewStatistics(meter)
stats, err := statistics.NewStatistics(meter)
if err != nil {
return nil, fmt.Errorf("it was not possible to create statistics: %w", err)
}
Expand All @@ -180,7 +181,7 @@ func NewClient(
addEventsEndpointUrl: addEventsEndpointUrl,
userAgent: userAgent,
serverHost: serverHost,
statistics: statictics,
statistics: stats,
}

// run buffer sweeper if requested
Expand Down Expand Up @@ -285,9 +286,9 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch
client.Logger.Debug("Received Buffer from channel",
zap.String("session", session),
zap.Int("processedMsgCnt", processedMsgCnt),
zap.Uint64("buffersEnqueued", client.statistics.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.statistics.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.statistics.buffersDropped.Load()),
zap.Uint64("buffersEnqueued", client.statistics.BuffersEnqueued()),
zap.Uint64("buffersProcessed", client.statistics.BuffersProcessed()),
zap.Uint64("buffersDropped", client.statistics.BuffersDropped()),
)
buf, bufferReadSuccess := msg.(*buffer.Buffer)
if bufferReadSuccess {
Expand Down Expand Up @@ -415,7 +416,7 @@ func (client *DataSetClient) statisticsSweeper() {
}

// Statistics returns statistics about events, buffers processing from the start time
func (client *DataSetClient) Statistics() *ExportedStatistics {
func (client *DataSetClient) Statistics() *statistics.ExportedStatistics {
// for how long are events being processed
firstAt := time.Unix(0, client.firstReceivedAt.Load())
lastAt := time.Unix(0, client.lastAcceptedAt.Load())
Expand Down
93 changes: 92 additions & 1 deletion pkg/client/statistics.go → pkg/statistics/statistics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
package client
/*
* Copyright 2023 SentinelOne, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package statistics

import (
"context"
"sync/atomic"
"time"

"github.com/scalyr/dataset-go/pkg/buffer_config"

"go.opentelemetry.io/otel/metric"
)

Expand Down Expand Up @@ -40,6 +58,9 @@ type Statistics struct {

cBytesAPISent metric.Int64UpDownCounter
cBytesAPIAccepted metric.Int64UpDownCounter

hPayloadSize metric.Int64Histogram
hResponseTime metric.Int64Histogram
}

func NewStatistics(meter *metric.Meter) (*Statistics, error) {
Expand Down Expand Up @@ -117,9 +138,67 @@ func (stats *Statistics) initMetrics(meter *metric.Meter) error {
return err
}

var payloadBuckets []float64
for _, r := range [11]float64{0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 0.9, 0.95, 1.0, 1.1, 2} {
payloadBuckets = append(payloadBuckets, r*buffer_config.LimitBufferSize)
}
stats.hPayloadSize, err = (*meter).Int64Histogram(key(
"payloadSize"),
metric.WithExplicitBucketBoundaries(payloadBuckets...),
metric.WithUnit("b"),
)
if err != nil {
return err
}

var responseBuckets []float64
for i := 0; i < 12; i++ {
responseBuckets = append(responseBuckets, float64(4*2^i))
}
stats.hResponseTime, err = (*meter).Int64Histogram(key(
"responseTime"),
metric.WithExplicitBucketBoundaries(responseBuckets...),
metric.WithUnit("ms"),
)
if err != nil {
return err
}

return err
}

func (stats *Statistics) BuffersEnqueued() uint64 {
return stats.buffersEnqueued.Load()
}

func (stats *Statistics) BuffersProcessed() uint64 {
return stats.buffersProcessed.Load()
}

func (stats *Statistics) BuffersDropped() uint64 {
return stats.buffersDropped.Load()
}

func (stats *Statistics) BuffersBroken() uint64 {
return stats.buffersBroken.Load()
}

func (stats *Statistics) EventsEnqueued() uint64 {
return stats.eventsEnqueued.Load()
}

func (stats *Statistics) EventsProcessed() uint64 {
return stats.eventsProcessed.Load()
}

func (stats *Statistics) EventsDropped() uint64 {
return stats.eventsDropped.Load()
}

func (stats *Statistics) EventsBroken() uint64 {
return stats.eventsBroken.Load()
}

func (stats *Statistics) BuffersEnqueuedAdd(i uint64) {
stats.buffersEnqueued.Add(i)
stats.add(stats.cBuffersEnqueued, i)
Expand Down Expand Up @@ -170,6 +249,18 @@ func (stats *Statistics) BytesAPIAcceptedAdd(i uint64) {
stats.add(stats.cBytesAPIAccepted, i)
}

func (stats *Statistics) PayloadSizeRecord(payloadSizeInBytes int64) {
if stats.hPayloadSize != nil {
stats.hPayloadSize.Record(context.Background(), payloadSizeInBytes)
}
}

func (stats *Statistics) ResponseTimeRecord(duration time.Duration) {
if stats.hResponseTime != nil {
stats.hResponseTime.Record(context.Background(), duration.Milliseconds())
}
}

func (stats *Statistics) add(counter metric.Int64UpDownCounter, i uint64) {
if counter != nil {
counter.Add(context.Background(), int64(i))
Expand Down
Loading

0 comments on commit ad85b54

Please sign in to comment.