From 4f7f9d189c68c6e0185d56ff96ac668b59e86910 Mon Sep 17 00:00:00 2001 From: jhsinger-klotho <111291520+jhsinger-klotho@users.noreply.github.com> Date: Wed, 18 Jan 2023 10:48:31 -0600 Subject: [PATCH] making pubsub idempotent and other fixes (#91) --- pkg/infra/pulumi_aws/deploylib.ts | 4 +- pkg/infra/pulumi_aws/index.ts.tmpl | 2 +- pkg/lang/javascript/plugin_pubsub.go | 64 +++++++++++++++++----------- pkg/lang/javascript/var_finder.go | 7 ++- pkg/provider/aws/infra_template.go | 2 +- pkg/provider/infra_template.go | 2 +- 6 files changed, 51 insertions(+), 30 deletions(-) diff --git a/pkg/infra/pulumi_aws/deploylib.ts b/pkg/infra/pulumi_aws/deploylib.ts index e4aa906d2..73a7efa83 100755 --- a/pkg/infra/pulumi_aws/deploylib.ts +++ b/pkg/infra/pulumi_aws/deploylib.ts @@ -615,12 +615,12 @@ export class CloudCCLib { createTopic( path: string, - varName: string, + name: string, event: string, publishers: string[], subscribers: string[] ): aws.sns.Topic { - let topic = `${this.name}_${path.replace(/[^0-9a-zA-Z_-]/g, '-')}_${varName}_${event}` + let topic = `${this.name}_${name}_${event}` if (topic.length > 256) { const hash = crypto.createHash('sha256') hash.update(topic) diff --git a/pkg/infra/pulumi_aws/index.ts.tmpl b/pkg/infra/pulumi_aws/index.ts.tmpl index d380fdc36..fa74f5f7e 100755 --- a/pkg/infra/pulumi_aws/index.ts.tmpl +++ b/pkg/infra/pulumi_aws/index.ts.tmpl @@ -172,7 +172,7 @@ export = async () => { {{range $event := .PubSubs}} cloudLib.createTopic( - "{{$event.Path}}", "{{$event.VarName}}", "{{$event.EventName}}", + "{{$event.Path}}", "{{$event.Name}}", "{{$event.EventName}}", {{if $event.Publishers }} {{json (keyNames $event.Publishers)}}, {{else}} diff --git a/pkg/lang/javascript/plugin_pubsub.go b/pkg/lang/javascript/plugin_pubsub.go index 286c0442a..133118b26 100644 --- a/pkg/lang/javascript/plugin_pubsub.go +++ b/pkg/lang/javascript/plugin_pubsub.go @@ -24,7 +24,8 @@ type ( result *core.CompilationResult deps *core.Dependencies - emitters map[VarSpec]*emitterValue + emitters map[VarSpec]*emitterValue + proxyGenerationCalls []proxyGenerationCall } emitterValue struct { @@ -36,6 +37,12 @@ type ( emitterUsage struct { filePath string event string + unitId string + } + + proxyGenerationCall struct { + filepath string + emitters []EmitterSubscriberProxyEntry } ) @@ -61,41 +68,46 @@ func (p Pubsub) Transform(result *core.CompilationResult, deps *core.Dependencie if value.Annotation.Capability.ID == "" { errs.Append(core.NewCompilerError(value.File, value.Annotation, errors.New("'id' is required"))) } - resource := core.PubSub{ - Path: spec.DefinedIn, - Name: value.Annotation.Capability.ID, - } - p.emitters[spec] = &emitterValue{ - Resource: &resource, + if _, ok := p.emitters[spec]; !ok { + resource := core.PubSub{ + Path: spec.DefinedIn, + Name: value.Annotation.Capability.ID, + } + p.emitters[spec] = &emitterValue{ + Resource: &resource, + } } } + + err := p.findProxiesNeeded(unit) + if err != nil { + errs.Append(err) + continue + } + varsByFile := vars.SplitByFile() if len(vars) == 0 { continue } - err := p.rewriteEmitters(unit, varsByFile) + err = p.rewriteEmitters(unit, varsByFile) if err != nil { errs.Append(err) } - } - for _, v := range p.emitters { - p.result.Add(v.Resource) } - for _, res := range result.Resources() { - unit, ok := res.(*core.ExecutionUnit) - if !ok { - continue - } - - err := p.processFiles(unit) + for _, call := range p.proxyGenerationCalls { + err := p.generateProxies(call.filepath, call.emitters) if err != nil { errs.Append(err) } } + for _, v := range p.emitters { + p.result.Add(v.Resource) + } + err := p.generateEmitterDefinitions() errs.Append(err) @@ -150,7 +162,7 @@ func (p *Pubsub) rewriteFileEmitters(f *core.SourceFile, vars VarDeclarations) e return errs.ErrOrNil() } -func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error { +func (p *Pubsub) findProxiesNeeded(unit *core.ExecutionUnit) error { var errs multierr.Error for _, f := range unit.Files() { var emitters []EmitterSubscriberProxyEntry @@ -161,11 +173,12 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error { if !ok { continue } + pEvents := p.findPublisherTopics(js, spec) if len(pEvents) > 0 { for _, event := range pEvents { value.Resource.AddPublisher(event, unit.Key()) - value.Publishers = append(value.Publishers, emitterUsage{filePath: f.Path(), event: event}) + value.Publishers = append(value.Publishers, emitterUsage{filePath: f.Path(), event: event, unitId: unit.Name}) log.Debugf("Adding publisher to '%s'", event) } p.deps.Add(unit.Key(), value.Resource.Key()) @@ -175,7 +188,7 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error { if len(sEvents) > 0 { for _, event := range sEvents { value.Resource.AddSubscriber(event, unit.Key()) - value.Subscribers = append(value.Subscribers, emitterUsage{filePath: f.Path(), event: event}) + value.Subscribers = append(value.Subscribers, emitterUsage{filePath: f.Path(), event: event, unitId: unit.Name}) log.Debugf("Adding subscriber to '%s'", event) } p.deps.Add(value.Resource.Key(), unit.Key()) @@ -203,9 +216,10 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error { } } - if err := p.generateProxies(f.Path(), emitters); err != nil { - errs.Append(err) - } + p.proxyGenerationCalls = append(p.proxyGenerationCalls, proxyGenerationCall{ + filepath: f.Path(), + emitters: emitters, + }) } return errs.ErrOrNil() } @@ -214,6 +228,7 @@ func (p *Pubsub) generateProxies(filepath string, emitters []EmitterSubscriberPr if len(emitters) == 0 { return nil } + tData := EmitterSubscriberProxy{ Path: filepath, Entries: emitters, @@ -243,6 +258,7 @@ func (p *Pubsub) generateProxies(filepath string, emitters []EmitterSubscriberPr if !ok { continue } + if existing := unit.Get(filepath); existing != nil { existing := existing.(*core.SourceFile) if bytes.Contains(existing.Program(), []byte(emitters[0].VarName)) { diff --git a/pkg/lang/javascript/var_finder.go b/pkg/lang/javascript/var_finder.go index 2cc85d288..4ad0a4e77 100644 --- a/pkg/lang/javascript/var_finder.go +++ b/pkg/lang/javascript/var_finder.go @@ -2,6 +2,7 @@ package javascript import ( "fmt" + "path/filepath" "strings" "github.com/klothoplatform/klotho/pkg/filter" @@ -206,12 +207,16 @@ func (vf *varFinder) parseNode(f *core.SourceFile, annot *core.Annotation) (inte // findVarName finds the internal name for the given `varSpec` within the given file. Returns "" if the var isn't defined in the file. func findVarName(f *core.SourceFile, spec VarSpec) string { log := zap.L().With(logging.FileField(f)).Sugar() + relPath, err := filepath.Rel(filepath.Dir(f.Path()), spec.DefinedIn) + if err != nil { + panic(err) + } varName := spec.InternalName if f.Path() != spec.DefinedIn { filteredImports := FindImportsInFile(f).Filter( filter.NewSimpleFilter( - IsRelativeImportOfModule(spec.DefinedIn), + IsRelativeImportOfModule(relPath), predicate.AnyOf( IsImportOfType(ImportTypeNamespace), predicate.AllOf( diff --git a/pkg/provider/aws/infra_template.go b/pkg/provider/aws/infra_template.go index 58ef5b8ee..022c993a8 100644 --- a/pkg/provider/aws/infra_template.go +++ b/pkg/provider/aws/infra_template.go @@ -143,7 +143,7 @@ func (a *AWS) Transform(result *core.CompilationResult, deps *core.Dependencies) Publishers: event.Publishers, Subscribers: event.Subscribers, Path: res.Path, - VarName: res.Name, + Name: res.Name, EventName: name, } data.PubSubs = append(data.PubSubs, ps) diff --git a/pkg/provider/infra_template.go b/pkg/provider/infra_template.go index 1614953d7..e1e6399f4 100644 --- a/pkg/provider/infra_template.go +++ b/pkg/provider/infra_template.go @@ -34,7 +34,7 @@ type ( Subscribers []core.ResourceKey Path string EventName string - VarName string + Name string Params config.InfraParams }