Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #55 from streamdal/blinktag/sampling
Browse files Browse the repository at this point in the history
Tail Sampling
  • Loading branch information
blinktag authored Dec 6, 2023
2 parents e4a1404 + 5c30793 commit d00b479
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ require (
github.com/onsi/gomega v1.28.0
github.com/pkg/errors v0.9.1
github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d
github.com/streamdal/protos v0.0.123
github.com/streamdal/protos v0.0.124
github.com/tetratelabs/wazero v1.5.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/streamdal/protos v0.0.123 h1:NJ5oqM5mHadehcmL/68cx+jUjQ8WbZRmIMUNspKm010=
github.com/streamdal/protos v0.0.123/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A=
github.com/streamdal/protos v0.0.124 h1:vI6Qj/ySJRRaG2IzfvKhrqA0NcUDGtqinCZYiatNf6k=
github.com/streamdal/protos v0.0.124/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand All @@ -56,6 +56,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss=
golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
Expand Down
22 changes: 22 additions & 0 deletions tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pkg/errors"
"github.com/relistan/go-director"
"golang.org/x/time/rate"

"github.com/streamdal/protos/build/go/protos"

Expand Down Expand Up @@ -41,6 +42,16 @@ type Tail struct {
lastMsg time.Time
log logger.Logger
active bool
limiter *rate.Limiter
}

func (t *Tail) ShouldSend() bool {
// If no rate limit, allow all messages
if t.limiter == nil {
return true
}

return t.limiter.Allow()
}

func (s *Streamdal) sendTail(aud *protos.Audience, pipelineID string, originalData []byte, postPipelineData []byte) {
Expand All @@ -64,6 +75,10 @@ func (s *Streamdal) sendTail(aud *protos.Audience, pipelineID string, originalDa
tail.active = true
}

if !tail.ShouldSend() {
continue
}

tr := &protos.TailResponse{
Type: protos.TailResponseType_TAIL_RESPONSE_TYPE_PAYLOAD,
TailRequestId: tailID,
Expand Down Expand Up @@ -196,6 +211,13 @@ func (s *Streamdal) startTailHandler(_ context.Context, cmd *protos.Command) err
active: false,
}

// Convert sample interval from seconds to time.Duration
if sampleOpts := cmd.GetTail().Request.GetSampleOptions(); sampleOpts != nil {
// Convert seconds to nanoseconds
interval := time.Duration(sampleOpts.SampleIntervalSeconds) * time.Second
t.limiter = rate.NewLimiter(rate.Every(interval), int(sampleOpts.SampleRate))
}

// Save entry in tail map
s.setActiveTail(t)

Expand Down

0 comments on commit d00b479

Please sign in to comment.