Skip to content

Commit

Permalink
fix: Update Pipeline topics when writable pipeline changed
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonard Goodell committed Oct 24, 2022
1 parent 33f1d54 commit f911664
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
2 changes: 2 additions & 0 deletions internal/app/configupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ func (processor *ConfigUpdateProcessor) processConfigChangedPipeline() {

// Update the pipelines with their new transforms
for _, pipeline := range pipelines {
// TODO: Look at better way to apply pipeline updates
sdk.runtime.SetFunctionsPipelineTransforms(pipeline.Id, pipeline.Transforms)
sdk.runtime.SetFunctionsPipelineTopics(pipeline.Id, pipeline.Topics)
}

sdk.LoggingClient().Info("Configurable Pipeline successfully reloaded from new configuration")
Expand Down
14 changes: 14 additions & 0 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ func (gr *GolangRuntime) SetFunctionsPipelineTransforms(id string, transforms []
}
}

// SetFunctionsPipelineTopics sets the topics for an existing function pipeline.
// Non-existent pipelines are ignored
func (gr *GolangRuntime) SetFunctionsPipelineTopics(id string, topics []string) {
pipeline := gr.pipelines[id]
if pipeline != nil {
gr.isBusyCopying.Lock()
pipeline.Topics = topics
gr.isBusyCopying.Unlock()
gr.lc.Infof("Topics set for `%s` pipeline", id)
} else {
gr.lc.Warnf("Unable to set topica for `%s` pipeline: Pipeline not found", id)
}
}

// ClearAllFunctionsPipelineTransforms clears the transforms for all existing function pipelines.
func (gr *GolangRuntime) ClearAllFunctionsPipelineTransforms() {
gr.isBusyCopying.Lock()
Expand Down

0 comments on commit f911664

Please sign in to comment.