From 1c87ce12f33a9fa007943d2169d92f2b149e8733 Mon Sep 17 00:00:00 2001 From: Mark G Date: Mon, 4 Dec 2023 10:10:48 -0600 Subject: [PATCH 1/3] Sampling --- go.mod | 2 +- go.sum | 4 ++-- tail.go | 30 ++++++++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 3a3e97f..5ef9d81 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.28.0 github.com/pkg/errors v0.9.1 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d - github.com/streamdal/protos v0.0.123 + github.com/streamdal/protos v0.0.124-0.20231204153745-a985d68d7dc5 github.com/tetratelabs/wazero v1.5.0 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 diff --git a/go.sum b/go.sum index a83ce98..242c84b 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/streamdal/protos v0.0.123 h1:NJ5oqM5mHadehcmL/68cx+jUjQ8WbZRmIMUNspKm010= -github.com/streamdal/protos v0.0.123/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A= +github.com/streamdal/protos v0.0.124-0.20231204153745-a985d68d7dc5 h1:xpj0Ai7HH3BFS1aJXUTLLqeQuNs0WyobykHaMjMbSiM= +github.com/streamdal/protos v0.0.124-0.20231204153745-a985d68d7dc5/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/tail.go b/tail.go index 1cba6c7..64ee826 100644 --- a/tail.go +++ b/tail.go @@ -3,6 +3,7 @@ package streamdal import ( "context" "io" + "math/rand" "strings" "time" @@ -41,6 +42,25 @@ type Tail struct { lastMsg time.Time log logger.Logger active bool + lastSample time.Time + sampleInterval time.Duration +} + +func (t *Tail) ShouldSend() bool { + opts := t.Request.GetTail().Request.GetSampleOptions() + if opts == nil { + // No sampling, send everything + return true + } + + elapsed := time.Since(t.lastSample) + + if elapsed >= t.sampleInterval && rand.Float64() < float64(opts.SampleRate/100) { + t.lastSample = time.Now() + return true + } + + return false } func (s *Streamdal) sendTail(aud *protos.Audience, pipelineID string, originalData []byte, postPipelineData []byte) { @@ -64,6 +84,10 @@ func (s *Streamdal) sendTail(aud *protos.Audience, pipelineID string, originalDa tail.active = true } + if !tail.ShouldSend() { + continue + } + tr := &protos.TailResponse{ Type: protos.TailResponseType_TAIL_RESPONSE_TYPE_PAYLOAD, TailRequestId: tailID, @@ -196,6 +220,12 @@ func (s *Streamdal) startTailHandler(_ context.Context, cmd *protos.Command) err active: false, } + // Convert sample interval from seconds to time.Duration + if sampleOpts := cmd.GetTail().Request.GetSampleOptions(); sampleOpts != nil { + // Convert seconds to nanoseconds + t.sampleInterval = time.Duration(sampleOpts.SampleIntervalSeconds / 1_000_000) + } + // Save entry in tail map s.setActiveTail(t) From abaf498d60a448749f19dd1bb915fb9720dc03d9 Mon Sep 17 00:00:00 2001 From: Mark G Date: Mon, 4 Dec 2023 14:07:16 -0600 Subject: [PATCH 2/3] Using x/time/rate package --- go.mod | 1 + go.sum | 2 ++ tail.go | 22 +++++++--------------- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 5ef9d81..179003d 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d github.com/streamdal/protos v0.0.124-0.20231204153745-a985d68d7dc5 github.com/tetratelabs/wazero v1.5.0 + golang.org/x/time v0.5.0 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 ) diff --git a/go.sum b/go.sum index 242c84b..63ecb13 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss= golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= diff --git a/tail.go b/tail.go index 64ee826..93f3ebb 100644 --- a/tail.go +++ b/tail.go @@ -3,12 +3,12 @@ package streamdal import ( "context" "io" - "math/rand" "strings" "time" "github.com/pkg/errors" "github.com/relistan/go-director" + "golang.org/x/time/rate" "github.com/streamdal/protos/build/go/protos" @@ -42,25 +42,16 @@ type Tail struct { lastMsg time.Time log logger.Logger active bool - lastSample time.Time - sampleInterval time.Duration + limiter *rate.Limiter } func (t *Tail) ShouldSend() bool { - opts := t.Request.GetTail().Request.GetSampleOptions() - if opts == nil { - // No sampling, send everything + // If no rate limit, allow all messages + if t.limiter == nil { return true } - elapsed := time.Since(t.lastSample) - - if elapsed >= t.sampleInterval && rand.Float64() < float64(opts.SampleRate/100) { - t.lastSample = time.Now() - return true - } - - return false + return t.limiter.Allow() } func (s *Streamdal) sendTail(aud *protos.Audience, pipelineID string, originalData []byte, postPipelineData []byte) { @@ -223,7 +214,8 @@ func (s *Streamdal) startTailHandler(_ context.Context, cmd *protos.Command) err // Convert sample interval from seconds to time.Duration if sampleOpts := cmd.GetTail().Request.GetSampleOptions(); sampleOpts != nil { // Convert seconds to nanoseconds - t.sampleInterval = time.Duration(sampleOpts.SampleIntervalSeconds / 1_000_000) + interval := time.Duration(sampleOpts.SampleIntervalSeconds) * time.Second + t.limiter = rate.NewLimiter(rate.Every(interval), int(sampleOpts.SampleRate)) } // Save entry in tail map From 5c307931c9d7b6b72aaf04e2c14566de6455ef6d Mon Sep 17 00:00:00 2001 From: Mark G Date: Mon, 4 Dec 2023 14:13:10 -0600 Subject: [PATCH 3/3] Bump protos version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 179003d..af24b03 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.28.0 github.com/pkg/errors v0.9.1 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d - github.com/streamdal/protos v0.0.124-0.20231204153745-a985d68d7dc5 + github.com/streamdal/protos v0.0.124 github.com/tetratelabs/wazero v1.5.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.59.0 diff --git a/go.sum b/go.sum index 63ecb13..138b301 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/streamdal/protos v0.0.124-0.20231204153745-a985d68d7dc5 h1:xpj0Ai7HH3BFS1aJXUTLLqeQuNs0WyobykHaMjMbSiM= -github.com/streamdal/protos v0.0.124-0.20231204153745-a985d68d7dc5/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A= +github.com/streamdal/protos v0.0.124 h1:vI6Qj/ySJRRaG2IzfvKhrqA0NcUDGtqinCZYiatNf6k= +github.com/streamdal/protos v0.0.124/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=