Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #61 from streamdal/blinktag/dynamic_transforms
Browse files Browse the repository at this point in the history
ENG-1458 - Dynamic Transforms
  • Loading branch information
blinktag authored Jan 17, 2024
2 parents 22d95a1 + 71852ec commit 2efd4c7
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 20 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/onsi/gomega v1.30.0
github.com/pkg/errors v0.9.1
github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d
github.com/streamdal/streamdal/libs/protos v0.1.12
github.com/streamdal/streamdal/libs/protos v0.1.14
github.com/tetratelabs/wazero v1.6.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.60.1
Expand All @@ -25,6 +25,6 @@ require (
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.16.1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
14 changes: 4 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,15 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/streamdal/streamdal/libs/protos v0.1.11 h1:PgGS2YHI3X7sHeazQKWjh1VMENW3klMF5NFcKtpsL3M=
github.com/streamdal/streamdal/libs/protos v0.1.11/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.12 h1:U0301m8/EYRHW6tCIk7lIJM86iMQVcoXAqIWdOayKkQ=
github.com/streamdal/streamdal/libs/protos v0.1.12/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.14 h1:DK1vPmBrX6fLy6uCmNJsonZdqp3zs3zXDCOaRwo3s6o=
github.com/streamdal/streamdal/libs/protos v0.1.14/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g=
github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -60,10 +56,8 @@ golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 h1:6G8oQ016D88m1xAKljMlBOOGWDZkes4kMhgGFlf8WcQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917/go.mod h1:xtjpI3tXFPP051KaWnhvxkiubL/6dJ18vLVf7q2pTOU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 h1:gphdwh0npgs8elJ4T6J+DQJHPVF7RsuJHCfwztUb4J4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA=
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
Expand Down
13 changes: 9 additions & 4 deletions go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (s *Streamdal) heartbeat(loop *director.TimedLooper) {
})
}

func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *protos.PipelineStep, data []byte) (*protos.WASMResponse, error) {
func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *protos.PipelineStep, data []byte, isr *protos.InterStepResult) (*protos.WASMResponse, error) {
s.config.Logger.Debugf("Running step '%s'", step.Name)

// Get WASM module
Expand All @@ -477,8 +477,9 @@ func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *pro
step.XWasmBytes = nil

req := &protos.WASMRequest{
InputPayload: data,
Step: step,
InputPayload: data,
Step: step,
InterStepResult: isr,
}

reqBytes, err := proto.Marshal(req)
Expand Down Expand Up @@ -607,6 +608,8 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessRe

PIPELINE:
for _, p := range pipelines {
var isr *protos.InterStepResult

pipelineTimeoutCtx, pipelineTimeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout)

pipelineStatus := &protos.PipelineStatus{
Expand Down Expand Up @@ -644,7 +647,7 @@ PIPELINE:
// NOOP
}

wasmResp, err := s.runStep(stepTimeoutCtx, aud, step, resp.Data)
wasmResp, err := s.runStep(stepTimeoutCtx, aud, step, resp.Data, isr)
if err != nil {
stepTimeoutCxl()

Expand Down Expand Up @@ -684,6 +687,8 @@ PIPELINE:
resp.Data = wasmResp.OutputPayload
}

isr = wasmResp.InterStepResult // Pass inter-step result to next step

// Check on success and on-failures
switch wasmResp.ExitCode {
case protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS:
Expand Down
125 changes: 121 additions & 4 deletions wasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"os"
"sync"
"time"

"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -13,6 +14,10 @@ import (

"github.com/streamdal/streamdal/libs/protos/build/go/protos"
"github.com/streamdal/streamdal/libs/protos/build/go/protos/steps"

"github.com/streamdal/go-sdk/logger"
"github.com/streamdal/go-sdk/metrics/metricsfakes"
"github.com/streamdal/go-sdk/server/serverfakes"
)

var _ = Describe("WASM Modules", func() {
Expand Down Expand Up @@ -396,7 +401,7 @@ var _ = Describe("WASM Modules", func() {
Expect(wasmResp.OutputPayload).Should(MatchJSON(`{"object": {"type": "str", "cc_num": "1234"}}`))
})

It("truncates a field by total length", func() {
It("truncates a field by percent length", func() {
wasmData, err := os.ReadFile("test-assets/wasm/transform.wasm")
Expect(err).ToNot(HaveOccurred())

Expand All @@ -408,8 +413,8 @@ var _ = Describe("WASM Modules", func() {
Options: &steps.TransformStep_TruncateOptions{
TruncateOptions: &steps.TransformTruncateOptions{
Path: "object.type",
Type: steps.TransformTruncateType_TRANSFORM_TRUNCATE_TYPE_LENGTH,
Value: 3,
Type: steps.TransformTruncateType_TRANSFORM_TRUNCATE_TYPE_PERCENTAGE,
Value: 50,
},
},
},
Expand All @@ -432,7 +437,7 @@ var _ = Describe("WASM Modules", func() {
Expect(err).ToNot(HaveOccurred())
Expect(wasmResp).ToNot(BeNil())
Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS))
Expect(wasmResp.OutputPayload).Should(MatchJSON(`{"object": {"type": "str", "cc_num": "1234"}}`))
Expect(wasmResp.OutputPayload).Should(MatchJSON(`{"object": {"type": "stre", "cc_num": "1234"}}`))
})
})

Expand Down Expand Up @@ -619,4 +624,116 @@ var _ = Describe("WASM Modules", func() {

// TODO: additional tests for each transform type
})

Context("inter step result", func() {
It("finds and transforms PII in a payload without a path", func() {
payload := []byte(`{
"users": [
{
"name": "Bob",
"email": "[email protected]"
},
{
"name": "Mary",
"email": "[email protected]"
}
]
}`)
detectiveWASM, err := os.ReadFile("test-assets/wasm/detective.wasm")
Expect(err).ToNot(HaveOccurred())
transformWASM, err := os.ReadFile("test-assets/wasm/transform.wasm")
Expect(err).ToNot(HaveOccurred())

pipeline := &protos.Pipeline{
Id: uuid.New().String(),
Name: "Test Pipeline",
Steps: []*protos.PipelineStep{
{
Name: "Find Email",
XWasmId: stringPtr(uuid.New().String()),
XWasmBytes: detectiveWASM,
XWasmFunction: stringPtr("f"),
OnSuccess: make([]protos.PipelineStepCondition, 0),
OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL},
Step: &protos.PipelineStep_Detective{
Detective: &steps.DetectiveStep{
Path: stringPtr(""), // No path, we're searching the entire payload
Negate: boolPtr(false),
Type: steps.DetectiveType_DETECTIVE_TYPE_PII_EMAIL,
},
},
},
{
Dynamic: true,
Name: "Transform Email",
XWasmId: stringPtr(uuid.New().String()),
XWasmBytes: transformWASM,
XWasmFunction: stringPtr("f"),
OnSuccess: make([]protos.PipelineStepCondition, 0),
OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL},
Step: &protos.PipelineStep_Transform{
Transform: &steps.TransformStep{
Type: steps.TransformType_TRANSFORM_TYPE_REPLACE_VALUE,
Options: &steps.TransformStep_ReplaceValueOptions{
ReplaceValueOptions: &steps.TransformReplaceValueOptions{
Path: "", // No path, we're getting the result from the detective step
Value: `"REDACTED"`,
},
},
},
},
},
},
}

aud := &protos.Audience{
ServiceName: "mysvc1",
ComponentName: "kafka",
OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER,
OperationName: "mytopic",
}

s := &Streamdal{
serverClient: &serverfakes.FakeIServerClient{},
functionsMtx: &sync.RWMutex{},
functions: map[string]*function{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
tails: map[string]map[string]*Tail{},
tailsMtx: &sync.RWMutex{},
config: &Config{
ServiceName: "mysvc1",
Logger: &logger.TinyLogger{},
StepTimeout: time.Millisecond * 1000,
PipelineTimeout: time.Millisecond * 1000,
},
metrics: &metricsfakes.FakeIMetrics{},
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{
audToStr(aud): {
pipeline.Id: {
Audience: aud,
Command: &protos.Command_AttachPipeline{
AttachPipeline: &protos.AttachPipelineCommand{
Pipeline: pipeline,
},
},
},
},
},
}

resp := s.Process(context.Background(), &ProcessRequest{
ComponentName: aud.ComponentName,
OperationType: OperationType(aud.OperationType),
OperationName: aud.OperationName,
Data: payload,
})

Expect(resp.Error).To(BeFalse())
Expect(resp.ErrorMessage).To(Equal(""))
Expect(resp.Data).To(MatchJSON(`{"users": [{"name":"Bob","email":"REDACTED"},{"name":"Mary","email":"REDACTED"}]}`))
})

})
})

0 comments on commit 2efd4c7

Please sign in to comment.