Skip to content

Commit

Permalink
Enqueue owner on launchplan terminal state (#118)
Browse files Browse the repository at this point in the history
## Overview
This PR enqueues the owner workflow for evaluation when the launchplan auto refresh cache detects a launchplan in a terminal state.

## Test Plan
Tested locally and on load-test-gcp, found significant improvements on test workflow with 2 tiers of launchplans (1m3s - >12s).

## Rollout Plan (if applicable)
This can be rolled out immediately.

## Upstream Changes
Unsure?
- [ ] To be upstreamed

## Jira Issue
https://unionai.atlassian.net/browse/EXO-81

## Checklist
* [x] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
hamersaw authored Mar 8, 2024
1 parent 42caed0 commit 80303bc
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 59 deletions.
34 changes: 17 additions & 17 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,23 +335,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
}

var launchPlanActor launchplan.FlyteAdmin
if cfg.EnableAdminLauncher {
launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient,
launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"), store)
if err != nil {
logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error())
return nil, err
}

if err := launchPlanActor.Initialize(ctx); err != nil {
logger.Errorf(ctx, "failed to initialize Admin workflow Launcher, err: %v", err.Error())
return nil, err
}
} else {
launchPlanActor = launchplan.NewFailFastLaunchPlanExecutor()
}

logger.Info(ctx, "Setting up event sink and recorder")
eventSink, err := events.ConstructEventSink(ctx, events.GetConfig(ctx), scope.NewSubScope("event_sink"))
if err != nil {
Expand Down Expand Up @@ -431,6 +414,23 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter

controller.levelMonitor = NewResourceLevelMonitor(scope.NewSubScope("collector"), flyteworkflowInformer.Lister())

var launchPlanActor launchplan.FlyteAdmin
if cfg.EnableAdminLauncher {
launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, launchplan.GetAdminConfig(),
scope.NewSubScope("admin_launcher"), store, controller.enqueueWorkflowForNodeUpdates)
if err != nil {
logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error())
return nil, err
}

if err := launchPlanActor.Initialize(ctx); err != nil {
logger.Errorf(ctx, "failed to initialize Admin workflow Launcher, err: %v", err.Error())
return nil, err
}
} else {
launchPlanActor = launchplan.NewFailFastLaunchPlanExecutor()
}

recoveryClient := recovery.NewClient(adminClient)
nodeHandlerFactory, err := factory.NewHandlerFactory(ctx, launchPlanActor, launchPlanActor,
kubeClient, kubeClientset, cacheClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) {
Version: "v",
ResourceType: core.ResourceType_LAUNCH_PLAN,
}
k8sWorkflowID := types.NamespacedName{
Namespace: "namespace",
Name: "name",
}
mockWfNode := &mocks2.ExecutableWorkflowNode{}
mockWfNode.OnGetLaunchPlanRefID().Return(&v1alpha1.Identifier{
Identifier: lpID,
Expand Down Expand Up @@ -179,6 +183,7 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) {
}),
mock.MatchedBy(func(o *core.Identifier) bool { return lpID == o }),
mock.MatchedBy(func(o *core.LiteralMap) bool { return o.Literals == nil }),
mock.MatchedBy(func(o string) bool { return o == k8sWorkflowID.String() }),
).Return(nil)

nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseUndefined, mockNode, mockNodeStatus)
Expand All @@ -204,6 +209,7 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) {
}),
mock.MatchedBy(func(o *core.Identifier) bool { return lpID == o }),
mock.MatchedBy(func(o *core.LiteralMap) bool { return o.Literals == nil }),
mock.MatchedBy(func(o string) bool { return o == k8sWorkflowID.String() }),
).Return(nil)

nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseUndefined, mockNode, mockNodeStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx interfaces
}
}
}
err = l.launchPlan.Launch(ctx, launchCtx, childID, nCtx.Node().GetWorkflowNode().GetLaunchPlanRefID().Identifier, nodeInputs)
err = l.launchPlan.Launch(ctx, launchCtx, childID, nCtx.Node().GetWorkflowNode().GetLaunchPlanRefID().Identifier,
nodeInputs, nCtx.NodeExecutionMetadata().GetOwnerID().String())
if err != nil {
if launchplan.IsAlreadyExists(err) {
logger.Infof(ctx, "Execution already exists [%s].", childID.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
evtErr "github.com/flyteorg/flyte/flytepropeller/events/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytestdlib/cache"
stdErr "github.com/flyteorg/flyte/flytestdlib/errors"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand All @@ -31,16 +32,18 @@ func IsWorkflowTerminated(p core.WorkflowExecution_Phase) bool {

// Executor for Launchplans that executes on a remote FlyteAdmin service (if configured)
type adminLaunchPlanExecutor struct {
adminClient service.AdminServiceClient
cache cache.AutoRefresh
store *storage.DataStore
adminClient service.AdminServiceClient
cache cache.AutoRefresh
store *storage.DataStore
enqueueWorkflow v1alpha1.EnqueueWorkflow
}

type executionCacheItem struct {
core.WorkflowExecutionIdentifier
ExecutionClosure *admin.ExecutionClosure
SyncError error
ExecutionOutputs *core.LiteralMap
ParentWorkflowID v1alpha1.WorkflowID
}

func (e executionCacheItem) IsTerminal() bool {
Expand Down Expand Up @@ -77,8 +80,9 @@ func (a *adminLaunchPlanExecutor) handleLaunchError(ctx context.Context, isRecov
}
}

func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchContext,
executionID *core.WorkflowExecutionIdentifier, launchPlanRef *core.Identifier, inputs *core.LiteralMap) error {
func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchContext, executionID *core.WorkflowExecutionIdentifier,
launchPlanRef *core.Identifier, inputs *core.LiteralMap, parentWorkflowID v1alpha1.WorkflowID) error {

var err error
if launchCtx.RecoveryExecution != nil {
_, err = a.adminClient.RecoverExecution(ctx, &admin.ExecutionRecoverRequest{
Expand Down Expand Up @@ -145,7 +149,7 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo
}
}

_, err = a.cache.GetOrCreate(executionID.String(), executionCacheItem{WorkflowExecutionIdentifier: *executionID})
_, err = a.cache.GetOrCreate(executionID.String(), executionCacheItem{WorkflowExecutionIdentifier: *executionID, ParentWorkflowID: parentWorkflowID})
if err != nil {
logger.Infof(ctx, "Failed to add ExecID [%v] to auto refresh cache", executionID)
}
Expand Down Expand Up @@ -252,6 +256,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
SyncError: err,
ParentWorkflowID: exec.ParentWorkflowID,
},
Action: cache.Update,
})
Expand Down Expand Up @@ -282,6 +287,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
SyncError: err,
ParentWorkflowID: exec.ParentWorkflowID,
},
Action: cache.Update,
})
Expand All @@ -301,19 +307,30 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
ExecutionClosure: res.Closure,
ExecutionOutputs: outputs,
ParentWorkflowID: exec.ParentWorkflowID,
},
Action: cache.Update,
})
}

// wait until all responses have been processed to enqueue parent workflows. if we do it
// prematurely, there is a chance the parent workflow evaluates before the cache is updated.
for _, itemSyncResponse := range resp {
exec := itemSyncResponse.Item.(executionCacheItem)
if exec.ExecutionClosure != nil && IsWorkflowTerminated(exec.ExecutionClosure.Phase) {
a.enqueueWorkflow(exec.ParentWorkflowID)
}
}

return resp, nil
}

func NewAdminLaunchPlanExecutor(_ context.Context, client service.AdminServiceClient,
cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) {
cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore, enqueueWorkflow v1alpha1.EnqueueWorkflow) (FlyteAdmin, error) {
exec := &adminLaunchPlanExecutor{
adminClient: client,
store: store,
adminClient: client,
store: store,
enqueueWorkflow: enqueueWorkflow,
}

rateLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(cfg.TPS), cfg.Burst)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {

t.Run("happy", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
assert.NoError(t, err)
mockClient.On("GetExecution",
ctx,
Expand All @@ -69,7 +69,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }),
).Return(nil, status.Error(codes.NotFound, ""))

exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
assert.NoError(t, err)

assert.NoError(t, exec.Initialize(ctx))
Expand All @@ -88,6 +88,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
id,
&core.Identifier{},
nil,
"",
)
assert.NoError(t, err)

Expand Down Expand Up @@ -115,7 +116,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }),
).Return(nil, status.Error(codes.Canceled, ""))

exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
assert.NoError(t, err)

assert.NoError(t, exec.Initialize(ctx))
Expand All @@ -134,6 +135,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
id,
&core.Identifier{},
nil,
"",
)
assert.NoError(t, err)

Expand Down Expand Up @@ -163,7 +165,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
t.Run("happy", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
mockClient.On("CreateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool {
Expand All @@ -186,6 +188,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
id,
&core.Identifier{},
nil,
"",
)
assert.NoError(t, err)
})
Expand All @@ -201,7 +204,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
Name: "orig",
},
}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
mockClient.On("RecoverExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionRecoverRequest) bool {
Expand All @@ -222,6 +225,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
id,
&core.Identifier{},
nil,
"",
)
assert.NoError(t, err)
})
Expand All @@ -237,7 +241,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
Name: "orig",
},
}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
assert.NoError(t, err)

recoveryErr := status.Error(codes.NotFound, "foo")
Expand Down Expand Up @@ -271,6 +275,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
id,
&core.Identifier{},
nil,
"",
)
assert.NoError(t, err)
assert.True(t, createCalled)
Expand All @@ -279,7 +284,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
t.Run("notFound", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
mockClient.On("CreateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }),
Expand All @@ -299,6 +304,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
id,
&core.Identifier{},
nil,
"",
)
assert.Error(t, err)
assert.True(t, IsAlreadyExists(err))
Expand All @@ -307,7 +313,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
t.Run("other", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
mockClient.On("CreateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }),
Expand All @@ -327,6 +333,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
id,
&core.Identifier{},
nil,
"",
)
assert.Error(t, err)
assert.False(t, IsAlreadyExists(err))
Expand All @@ -349,7 +356,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) {
t.Run("happy", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
mockClient.On("TerminateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }),
Expand All @@ -362,7 +369,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) {
t.Run("notFound", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
mockClient.On("TerminateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }),
Expand All @@ -375,7 +382,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) {
t.Run("other", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
mockClient.On("TerminateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }),
Expand Down Expand Up @@ -403,7 +410,7 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) {

t.Run("launch plan found", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
assert.NoError(t, err)
mockClient.OnGetLaunchPlanMatch(
ctx,
Expand All @@ -416,7 +423,7 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) {

t.Run("launch plan not found", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore, func(string) {})
assert.NoError(t, err)
mockClient.OnGetLaunchPlanMatch(
ctx,
Expand Down Expand Up @@ -559,7 +566,7 @@ func TestAdminLaunchPlanExecutorScenarios(t *testing.T) {
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), storageClient)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), storageClient, func(string) {})
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

//go:generate mockery -all -case=underscore
Expand Down Expand Up @@ -36,7 +37,8 @@ type LaunchContext struct {
// Executor interface to be implemented by the remote system that can allow workflow launching capabilities
type Executor interface {
// Launch start an execution of a launchplan
Launch(ctx context.Context, launchCtx LaunchContext, executionID *core.WorkflowExecutionIdentifier, launchPlanRef *core.Identifier, inputs *core.LiteralMap) error
Launch(ctx context.Context, launchCtx LaunchContext, executionID *core.WorkflowExecutionIdentifier,
launchPlanRef *core.Identifier, inputs *core.LiteralMap, parentWorkflowID v1alpha1.WorkflowID) error

// GetStatus retrieves status of a LaunchPlan execution
GetStatus(ctx context.Context, executionID *core.WorkflowExecutionIdentifier) (*admin.ExecutionClosure, *core.LiteralMap, error)
Expand Down
Loading

0 comments on commit 80303bc

Please sign in to comment.