-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
making pubsub idempotent and other fixes #91
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,8 @@ type ( | |
result *core.CompilationResult | ||
deps *core.Dependencies | ||
|
||
emitters map[VarSpec]*emitterValue | ||
emitters map[VarSpec]*emitterValue | ||
proxyGenerationCalls []proxyGenerationCall | ||
} | ||
|
||
emitterValue struct { | ||
|
@@ -36,6 +37,12 @@ type ( | |
emitterUsage struct { | ||
filePath string | ||
event string | ||
unitId string | ||
} | ||
|
||
proxyGenerationCall struct { | ||
filepath string | ||
emitters []EmitterSubscriberProxyEntry | ||
} | ||
) | ||
|
||
|
@@ -61,41 +68,45 @@ func (p Pubsub) Transform(result *core.CompilationResult, deps *core.Dependencie | |
if value.Annotation.Capability.ID == "" { | ||
errs.Append(core.NewCompilerError(value.File, value.Annotation, errors.New("'id' is required"))) | ||
} | ||
resource := core.PubSub{ | ||
Path: spec.DefinedIn, | ||
Name: value.Annotation.Capability.ID, | ||
} | ||
p.emitters[spec] = &emitterValue{ | ||
Resource: &resource, | ||
if _, ok := p.emitters[spec]; !ok { | ||
resource := core.PubSub{ | ||
Path: spec.DefinedIn, | ||
Name: value.Annotation.Capability.ID, | ||
} | ||
p.emitters[spec] = &emitterValue{ | ||
Resource: &resource, | ||
} | ||
} | ||
} | ||
|
||
err := p.processFiles(unit) | ||
if err != nil { | ||
errs.Append(err) | ||
gordon-klotho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
varsByFile := vars.SplitByFile() | ||
if len(vars) == 0 { | ||
continue | ||
} | ||
|
||
err := p.rewriteEmitters(unit, varsByFile) | ||
err = p.rewriteEmitters(unit, varsByFile) | ||
if err != nil { | ||
errs.Append(err) | ||
} | ||
} | ||
|
||
for _, v := range p.emitters { | ||
p.result.Add(v.Resource) | ||
} | ||
|
||
for _, res := range result.Resources() { | ||
unit, ok := res.(*core.ExecutionUnit) | ||
if !ok { | ||
continue | ||
} | ||
|
||
err := p.processFiles(unit) | ||
for _, call := range p.proxyGenerationCalls { | ||
err := p.generateProxies(call.filepath, call.emitters) | ||
if err != nil { | ||
errs.Append(err) | ||
} | ||
} | ||
|
||
for _, v := range p.emitters { | ||
p.result.Add(v.Resource) | ||
} | ||
|
||
err := p.generateEmitterDefinitions() | ||
errs.Append(err) | ||
|
||
|
@@ -161,11 +172,12 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error { | |
if !ok { | ||
continue | ||
} | ||
|
||
pEvents := p.findPublisherTopics(js, spec) | ||
if len(pEvents) > 0 { | ||
for _, event := range pEvents { | ||
value.Resource.AddPublisher(event, unit.Key()) | ||
value.Publishers = append(value.Publishers, emitterUsage{filePath: f.Path(), event: event}) | ||
value.Publishers = append(value.Publishers, emitterUsage{filePath: f.Path(), event: event, unitId: unit.Name}) | ||
log.Debugf("Adding publisher to '%s'", event) | ||
} | ||
p.deps.Add(unit.Key(), value.Resource.Key()) | ||
|
@@ -175,7 +187,7 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error { | |
if len(sEvents) > 0 { | ||
for _, event := range sEvents { | ||
value.Resource.AddSubscriber(event, unit.Key()) | ||
value.Subscribers = append(value.Subscribers, emitterUsage{filePath: f.Path(), event: event}) | ||
value.Subscribers = append(value.Subscribers, emitterUsage{filePath: f.Path(), event: event, unitId: unit.Name}) | ||
log.Debugf("Adding subscriber to '%s'", event) | ||
} | ||
p.deps.Add(value.Resource.Key(), unit.Key()) | ||
|
@@ -203,9 +215,10 @@ func (p *Pubsub) processFiles(unit *core.ExecutionUnit) error { | |
} | ||
} | ||
|
||
if err := p.generateProxies(f.Path(), emitters); err != nil { | ||
errs.Append(err) | ||
} | ||
p.proxyGenerationCalls = append(p.proxyGenerationCalls, proxyGenerationCall{ | ||
gordon-klotho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
filepath: f.Path(), | ||
emitters: emitters, | ||
}) | ||
} | ||
return errs.ErrOrNil() | ||
} | ||
|
@@ -214,6 +227,7 @@ func (p *Pubsub) generateProxies(filepath string, emitters []EmitterSubscriberPr | |
if len(emitters) == 0 { | ||
return nil | ||
} | ||
|
||
tData := EmitterSubscriberProxy{ | ||
Path: filepath, | ||
Entries: emitters, | ||
|
@@ -243,6 +257,7 @@ func (p *Pubsub) generateProxies(filepath string, emitters []EmitterSubscriberPr | |
if !ok { | ||
continue | ||
} | ||
|
||
if existing := unit.Get(filepath); existing != nil { | ||
existing := existing.(*core.SourceFile) | ||
if bytes.Contains(existing.Program(), []byte(emitters[0].VarName)) { | ||
|
@@ -360,7 +375,14 @@ func (p *Pubsub) generateEmitterDefinitions() (err error) { | |
} | ||
|
||
func findTopics(f *core.SourceFile, spec VarSpec, query string, methodName string) (topics []string) { | ||
varName := findVarName(f, spec) | ||
relPath, _ := filepath.Rel(filepath.Dir(f.Path()), spec.DefinedIn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error should be checked, even if it's to panic. |
||
newSpec := VarSpec{ | ||
DefinedIn: relPath, | ||
InternalName: spec.InternalName, | ||
VarName: spec.VarName, | ||
} | ||
|
||
varName := findVarName(f, newSpec) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a shortcoming of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will move this logic in there and panic on the relPAth error. Otherwise we need a major refactor of finding every method that uses findVarName There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Realistically, relPath only errors when one of the paths is absolute, which we should never have. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's been a while, so I forget exactly what all the inputs to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is indeed language specific, but the var finder is in the javascript package so I'd expect it to be. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤦♂️ So it is. For some reason I thought it was in a language-agnostic package (and I didn't even think to double check that assumption in the path). Sorry — carry on! :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah i just realized, wouldnt this cause issues for golang? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh wait this is js specific |
||
if varName == "" { | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove the path? Is
varName
not the emitter's variable name anymore (ie, is it the ID)? Didn't this need to match with something in the runtime as well?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the path isnt in the runtime and i believe that varName is actually the ID and not truly the varName. Let me double check because if it is the varname we could have conflicts. I can also rename the variable to pubsubID so it makes more sense