Skip to content

Commit

Permalink
making pubsub idempotent and other fixes (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhsinger-klotho authored Jan 18, 2023
1 parent be8d7d7 commit 4f7f9d1
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 30 deletions.
4 changes: 2 additions & 2 deletions pkg/infra/pulumi_aws/deploylib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,12 @@ export class CloudCCLib {

createTopic(
path: string,
varName: string,
name: string,
event: string,
publishers: string[],
subscribers: string[]
): aws.sns.Topic {
let topic = `${this.name}_${path.replace(/[^0-9a-zA-Z_-]/g, '-')}_${varName}_${event}`
let topic = `${this.name}_${name}_${event}`
if (topic.length > 256) {
const hash = crypto.createHash('sha256')
hash.update(topic)
Expand Down
2 changes: 1 addition & 1 deletion pkg/infra/pulumi_aws/index.ts.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export = async () => {

{{range $event := .PubSubs}}
cloudLib.createTopic(
"{{$event.Path}}", "{{$event.VarName}}", "{{$event.EventName}}",
"{{$event.Path}}", "{{$event.Name}}", "{{$event.EventName}}",
{{if $event.Publishers }}
{{json (keyNames $event.Publishers)}},
{{else}}
Expand Down
64 changes: 40 additions & 24 deletions pkg/lang/javascript/plugin_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type (
result *core.CompilationResult
deps *core.Dependencies

emitters map[VarSpec]*emitterValue
emitters map[VarSpec]*emitterValue
proxyGenerationCalls []proxyGenerationCall
}

emitterValue struct {
Expand All @@ -36,6 +37,12 @@ type (
emitterUsage struct {
filePath string
event string
unitId string
}

proxyGenerationCall struct {
filepath string
emitters []EmitterSubscriberProxyEntry
}
)

Expand All @@ -61,41 +68,46 @@ func (p Pubsub) Transform(result *core.CompilationResult, deps *core.Dependencie
if value.Annotation.Capability.ID == "" {
errs.Append(core.NewCompilerError(value.File, value.Annotation, errors.New("'id' is required")))
}
resource := core.PubSub{
Path: spec.DefinedIn,
Name: value.Annotation.Capability.ID,
}
p.emitters[spec] = &emitterValue{
Resource: &resource,
if _, ok := p.emitters[spec]; !ok {
resource := core.PubSub{
Path: spec.DefinedIn,
Name: value.Annotation.Capability.ID,
}
p.emitters[spec] = &emitterValue{
Resource: &resource,
}
}
}

err := p.findProxiesNeeded(unit)
if err != nil {
errs.Append(err)
continue
}

varsByFile := vars.SplitByFile()
if len(vars) == 0 {
continue
}

err := p.rewriteEmitters(unit, varsByFile)
err = p.rewriteEmitters(unit, varsByFile)
if err != nil {
errs.Append(err)
}
}

for _, v := range p.emitters {
p.result.Add(v.Resource)
}

for _, res := range result.Resources() {
unit, ok := res.(*core.ExecutionUnit)
if !ok {
continue
}

err := p.processFiles(unit)
for _, call := range p.proxyGenerationCalls {
err := p.generateProxies(call.filepath, call.emitters)
if err != nil {
errs.Append(err)
}
}

for _, v := range p.emitters {
p.result.Add(v.Resource)
}

err := p.generateEmitterDefinitions()
errs.Append(err)

Expand Down Expand Up @@ -150,7 +162,7 @@ func (p *Pubsub) rewriteFileEmitters(f *core.SourceFile, vars VarDeclarations) e
return errs.ErrOrNil()
}

func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error {
func (p *Pubsub) findProxiesNeeded(unit *core.ExecutionUnit) error {
var errs multierr.Error
for _, f := range unit.Files() {
var emitters []EmitterSubscriberProxyEntry
Expand All @@ -161,11 +173,12 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error {
if !ok {
continue
}

pEvents := p.findPublisherTopics(js, spec)
if len(pEvents) > 0 {
for _, event := range pEvents {
value.Resource.AddPublisher(event, unit.Key())
value.Publishers = append(value.Publishers, emitterUsage{filePath: f.Path(), event: event})
value.Publishers = append(value.Publishers, emitterUsage{filePath: f.Path(), event: event, unitId: unit.Name})
log.Debugf("Adding publisher to '%s'", event)
}
p.deps.Add(unit.Key(), value.Resource.Key())
Expand All @@ -175,7 +188,7 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error {
if len(sEvents) > 0 {
for _, event := range sEvents {
value.Resource.AddSubscriber(event, unit.Key())
value.Subscribers = append(value.Subscribers, emitterUsage{filePath: f.Path(), event: event})
value.Subscribers = append(value.Subscribers, emitterUsage{filePath: f.Path(), event: event, unitId: unit.Name})
log.Debugf("Adding subscriber to '%s'", event)
}
p.deps.Add(value.Resource.Key(), unit.Key())
Expand Down Expand Up @@ -203,9 +216,10 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error {
}
}

if err := p.generateProxies(f.Path(), emitters); err != nil {
errs.Append(err)
}
p.proxyGenerationCalls = append(p.proxyGenerationCalls, proxyGenerationCall{
filepath: f.Path(),
emitters: emitters,
})
}
return errs.ErrOrNil()
}
Expand All @@ -214,6 +228,7 @@ func (p *Pubsub) generateProxies(filepath string, emitters []EmitterSubscriberPr
if len(emitters) == 0 {
return nil
}

tData := EmitterSubscriberProxy{
Path: filepath,
Entries: emitters,
Expand Down Expand Up @@ -243,6 +258,7 @@ func (p *Pubsub) generateProxies(filepath string, emitters []EmitterSubscriberPr
if !ok {
continue
}

if existing := unit.Get(filepath); existing != nil {
existing := existing.(*core.SourceFile)
if bytes.Contains(existing.Program(), []byte(emitters[0].VarName)) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/lang/javascript/var_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package javascript

import (
"fmt"
"path/filepath"
"strings"

"github.com/klothoplatform/klotho/pkg/filter"
Expand Down Expand Up @@ -206,12 +207,16 @@ func (vf *varFinder) parseNode(f *core.SourceFile, annot *core.Annotation) (inte
// findVarName finds the internal name for the given `varSpec` within the given file. Returns "" if the var isn't defined in the file.
func findVarName(f *core.SourceFile, spec VarSpec) string {
log := zap.L().With(logging.FileField(f)).Sugar()
relPath, err := filepath.Rel(filepath.Dir(f.Path()), spec.DefinedIn)
if err != nil {
panic(err)
}

varName := spec.InternalName
if f.Path() != spec.DefinedIn {
filteredImports := FindImportsInFile(f).Filter(
filter.NewSimpleFilter(
IsRelativeImportOfModule(spec.DefinedIn),
IsRelativeImportOfModule(relPath),
predicate.AnyOf(
IsImportOfType(ImportTypeNamespace),
predicate.AllOf(
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/aws/infra_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (a *AWS) Transform(result *core.CompilationResult, deps *core.Dependencies)
Publishers: event.Publishers,
Subscribers: event.Subscribers,
Path: res.Path,
VarName: res.Name,
Name: res.Name,
EventName: name,
}
data.PubSubs = append(data.PubSubs, ps)
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/infra_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type (
Subscribers []core.ResourceKey
Path string
EventName string
VarName string
Name string
Params config.InfraParams
}

Expand Down

0 comments on commit 4f7f9d1

Please sign in to comment.