Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version graph fixes #3287

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 146 additions & 51 deletions service/matching/version_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package matching

import (
"bytes"
"encoding/binary"
"fmt"

Expand Down Expand Up @@ -109,23 +110,26 @@ func depthLimiter(g *persistence.VersioningData, maxDepth int, noMutate bool) *w
// See the API docs for more detail. In short, the graph looks like one long line of default versions, each of which
// is incompatible with the previous, optionally with branching compatibility branches. Like so:
//
// ─┬─1.0───2.0─┬─3.0───4.0
// │ ├─3.1
// │ └─3.2
// ├─1.1
// ├─1.2
// └─1.3
// ─┬─1.0───2.0─┬─3.0───4.0
// │ ├─3.1
// │ └─3.2
// ├─1.1
// ├─1.2
// └─1.3
//
// In the above graph, 4.0 is the current default, and [1.3, 3.2] is the set of current compatible leaves. Links
// going left are incompatible relationships, and links going up are compatible relationships.
//
// A request may:
// 1. Add a new version to the graph, as a default version
// 2. Add a new version to the graph, compatible with some existing version.
// 3. Add a new version to the graph, compatible with some existing version and as the new default.
// 4. Unset a version as a default. It will be dropped and its previous incompatible version becomes default.
// 5. Unset a version as a compatible. It will be dropped and its previous compatible version will become the new
// compatible leaf for that branch.
// 1. Add a new version to the graph, as a default version
// 2. Add a new version to the graph, compatible with some existing version.
// 3. Add a new version to the graph, compatible with some existing version and as the new default.
// 4. Reorder an existing version (and it's whole compatible branch). We allow moving a node in the incompatible line
// to the front (along with its entire compatible branch), as long as the user has targeted the most recent
// version in that compatible branch.
//
// Deletions are not allowed, as it leads to confusion about what to do with open workflows who were operating on that
// version. It's better to simply add a new version (possibly with no associated workers) instead.
func UpdateVersionsGraph(existingData *persistence.VersioningData, req *workflowservice.UpdateWorkerBuildIdOrderingRequest, maxSize int) error {
if req.GetVersionId().GetWorkerBuildId() == "" {
return serviceerror.NewInvalidArgument(
Expand All @@ -151,10 +155,23 @@ func updateImpl(existingData *persistence.VersioningData, req *workflowservice.U
// It does not make sense to introduce a version which is the new overall default, but is somehow also
// supposed to be compatible with some existing version, as that would necessarily imply that the newly
// added version is somehow both compatible and incompatible with the same target version.
return serviceerror.NewInvalidArgument("adding a new default version which is compatible " +
return serviceerror.NewInvalidArgument("adding a new default version which is compatible" +
" with any version other than the existing default is not allowed.")
}
if curDefault != nil {
if req.GetVersionId() == curDefault.GetVersion() {
// User is setting current default as... current default. Do nothing.
return nil
}

didReorder, err := reorderExistingNodeIfNeeded(existingData, req.VersionId)
if err != nil {
return err
}
if didReorder {
return nil
}

// If the current default is going to be the previous compat version with the one we're adding,
// then we need to skip over it when setting the previous *incompatible* version.
if isCompatWithCurDefault {
Expand Down Expand Up @@ -197,37 +214,14 @@ func updateImpl(existingData *persistence.VersioningData, req *workflowservice.U
fmt.Sprintf("previous compatible version %v not found", req.GetPreviousCompatible()))
}
} else {
// Check if the version is already a default, and remove it from being one if it is.
curDefault := existingData.GetCurrentDefault()
if curDefault.GetVersion().Equal(req.GetVersionId()) {
existingData.CurrentDefault = nil
if curDefault.GetPreviousCompatible() != nil {
existingData.CurrentDefault = curDefault.GetPreviousCompatible()
} else if curDefault.GetPreviousIncompatible() != nil {
existingData.CurrentDefault = curDefault.GetPreviousIncompatible()
}
return nil
}
// Check if it's a compatible leaf, and remove it from being one if it is.
for i, def := range existingData.GetCompatibleLeaves() {
if def.GetVersion().Equal(req.GetVersionId()) {
existingData.CompatibleLeaves =
append(existingData.CompatibleLeaves[:i], existingData.CompatibleLeaves[i+1:]...)
if def.GetPreviousCompatible() != nil {
existingData.CompatibleLeaves =
append(existingData.CompatibleLeaves, def.GetPreviousCompatible())
}
return nil
}
}
return serviceerror.NewInvalidArgument(
"requests to update build id ordering cannot create a new non-default version with no links")
}
}
return nil
}

// Finds the node that the provided version should point at, given that it says it's compatible with the provided
// Finds the node that some new version should point at, given that it says it's compatible with the provided
// version. Note that this does not necessary mean *that* node. If the version being targeted as compatible has nodes
// which already point at it as their previous compatible version, that chain will be followed out to the leaf, which
// will be returned.
Expand All @@ -238,37 +232,138 @@ func findCompatibleNode(
// First search down from all existing compatible leaves, as if any of those chains point at the desired version,
// we will need to return that leaf.
for ix, node := range existingData.GetCompatibleLeaves() {
if node.GetVersion().Equal(versionId) {
return node, ix
}
if findInNode(node, versionId) != nil {
if found, _, _ := findInNode(node, versionId, searchModeBoth); found != nil {
return node, ix
}
}
// Otherwise, this must be targeting some version in the default/incompatible chain, and it will become a new leaf
curDefault := existingData.GetCurrentDefault()
if curDefault.GetVersion().Equal(versionId) {
return curDefault, -1
}
if nn := findInNode(curDefault, versionId); nn != nil {
if nn, _, _ := findInNode(curDefault, versionId, searchModeBoth); nn != nil {
return nn, -1
}

return nil, -1
}

// The provided node wants to become the default, and thus move its compatible branch (which may be just itself) to the
// front of the incompatible list. If the node is the head of its compatible branch, this is acceptable, otherwise it is
// an error.
func reorderExistingNodeIfNeeded(existingData *persistence.VersioningData, versionId *taskqueuepb.VersionId) (bool, error) {
// First find if this node is inside a compatible branch. It could never be in more than one, since we
// do not allow forks.
for _, node := range existingData.GetCompatibleLeaves() {
if node.GetVersion().Equal(versionId) {
// We've got to find the node in the compatible branch which is pointed to by some node in the incompatible
// branch
entireCompatBranch := make(map[string]struct{})
curCompatNode := node
for curCompatNode != nil {
entireCompatBranch[curCompatNode.GetVersion().GetWorkerBuildId()] = struct{}{}
curCompatNode = curCompatNode.GetPreviousCompatible()
}
found, nextIncompat, _ := findAnyInNode(existingData.GetCurrentDefault(), entireCompatBranch, searchModeIncompat)
if found != nil && nextIncompat != nil {
nextIncompat.PreviousIncompatible = found.PreviousIncompatible
found.PreviousIncompatible = nil
node.PreviousIncompatible = nextIncompat
existingData.CurrentDefault = node
return true, nil
}
return false, nil
}
if found, _, _ := findInNode(node, versionId, searchModeCompat); found != nil {
return false, serviceerror.NewInvalidArgument("A node which is already inside a compatible branch, but " +
"is not the leaf node of that branch, cannot be made default")
}
}

// Now check in the incompatible branch, and if this node is found in it, shuffle its branch to the front.
found, nextIncompat, _ := findInNode(existingData.GetCurrentDefault(), versionId, searchModeIncompat)
if found != nil && nextIncompat != nil {
nextIncompat.PreviousIncompatible = found.PreviousIncompatible
found.PreviousIncompatible = existingData.GetCurrentDefault()
existingData.CurrentDefault = found
return true, nil
}

return false, nil
}

type searchMode int

const (
searchModeBoth searchMode = iota
searchModeCompat
searchModeIncompat
)

func findInNode(
node *taskqueuepb.VersionIdNode,
versionId *taskqueuepb.VersionId,
) *taskqueuepb.VersionIdNode {
if node.GetVersion().Equal(versionId) {
return node
mode searchMode,
) (*taskqueuepb.VersionIdNode, *taskqueuepb.VersionIdNode, *taskqueuepb.VersionIdNode) {
s := make(map[string]struct{})
s[versionId.GetWorkerBuildId()] = struct{}{}
return _findInNode(node, s, mode, nil, nil)
}

func findAnyInNode(
node *taskqueuepb.VersionIdNode,
versionIds map[string]struct{},
mode searchMode,
) (*taskqueuepb.VersionIdNode, *taskqueuepb.VersionIdNode, *taskqueuepb.VersionIdNode) {
return _findInNode(node, versionIds, mode, nil, nil)
}

func _findInNode(
node *taskqueuepb.VersionIdNode,
versionIds map[string]struct{},
mode searchMode,
nextIncompat *taskqueuepb.VersionIdNode,
nextCompat *taskqueuepb.VersionIdNode,
) (*taskqueuepb.VersionIdNode, *taskqueuepb.VersionIdNode, *taskqueuepb.VersionIdNode) {
if _, ok := versionIds[node.GetVersion().GetWorkerBuildId()]; ok {
return node, nextIncompat, nextCompat
}
if (mode == searchModeBoth || mode == searchModeCompat) && node.GetPreviousCompatible() != nil {
return _findInNode(node.GetPreviousCompatible(), versionIds, mode, nextIncompat, node)
}
if (mode == searchModeBoth || mode == searchModeIncompat) && node.GetPreviousIncompatible() != nil {
return _findInNode(node.GetPreviousIncompatible(), versionIds, mode, node, nextCompat)
}
return nil, nextIncompat, nextCompat
}

// For graph visualization purposes while debugging
//
//lint:ignore U1000 Only needed for debugging purposes
func toDot(data *persistence.VersioningData) string {
var buf bytes.Buffer
buf.WriteString("digraph {\n")
buf.WriteString("// Default\n")
node := data.GetCurrentDefault()
buf.WriteString(fmt.Sprintf(" %s [label=\"%s\"];\n", node.GetVersion().GetWorkerBuildId(), node.GetVersion().GetWorkerBuildId()))
buf.WriteString("// Compatible leafs\n")
for _, node := range data.GetCompatibleLeaves() {
buf.WriteString(fmt.Sprintf(" %s [label=\"%s\"];\n", node.GetVersion().GetWorkerBuildId(), node.GetVersion().GetWorkerBuildId()))
}

for _, node := range data.GetCompatibleLeaves() {
writeEdgesForNode(&buf, node)
}
writeEdgesForNode(&buf, data.GetCurrentDefault())

buf.WriteString("}\n")
return buf.String()
}

func writeEdgesForNode(buf *bytes.Buffer, node *taskqueuepb.VersionIdNode) {
if node.GetPreviousCompatible() != nil {
return findInNode(node.GetPreviousCompatible(), versionId)
buf.WriteString(fmt.Sprintf(" %s -> %s [label=\"compat\"];\n", node.GetVersion().GetWorkerBuildId(), node.GetPreviousCompatible().GetVersion().GetWorkerBuildId()))
writeEdgesForNode(buf, node.GetPreviousCompatible())
}
if node.GetPreviousIncompatible() != nil {
return findInNode(node.GetPreviousIncompatible(), versionId)
buf.WriteString(fmt.Sprintf(" %s -> %s [label=\"incompat\"];\n", node.GetVersion().GetWorkerBuildId(), node.GetPreviousIncompatible().GetVersion().GetWorkerBuildId()))
writeEdgesForNode(buf, node.GetPreviousIncompatible())
}
return nil
}
Loading