Skip to content

Commit

Permalink
Add workflow and task sending (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored Sep 7, 2023
1 parent a2d4735 commit 462343a
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 56 deletions.
4 changes: 4 additions & 0 deletions flyteadmin/pkg/artifacts/artifact_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOpti
}

func InitializeArtifactClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) artifact.ArtifactRegistryClient {
if cfg == nil {
logger.Warningf(ctx, "Artifact config is not set, skipping creation of client...")
return nil
}
conn, err := NewArtifactConnection(ctx, cfg, opts...)
if err != nil {
logger.Panicf(ctx, "failed to initialize Artifact connection. Err: %s", err.Error())
Expand Down
61 changes: 61 additions & 0 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package artifacts

import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/logger"
"google.golang.org/grpc"

"context"
)

// ArtifactRegistry contains a client to talk to an Artifact service and has helper methods
type ArtifactRegistry struct {
client artifact.ArtifactRegistryClient
}

func (a *ArtifactRegistry) RegisterArtifactProducer(ctx context.Context, id *core.Identifier, ti core.TypedInterface) {
if a.client == nil {
logger.Debugf(ctx, "Artifact client not configured, skipping registration for task [%+v]", id)
return
}
ap := &artifact.ArtifactProducer{
EntityId: id,
Outputs: ti.Outputs,
}
_, err := a.client.RegisterProducer(ctx, &artifact.RegisterProducerRequest{
Producers: []*artifact.ArtifactProducer{ap},
})
if err != nil {
logger.Errorf(ctx, "Failed to register artifact producer for task [%+v] with err: %v", id, err)
}
logger.Debugf(ctx, "Registered artifact producer [%+v]", id)
}

func (a *ArtifactRegistry) RegisterArtifactConsumer(ctx context.Context, id *core.Identifier, pm core.ParameterMap) {
if a.client == nil {
logger.Debugf(ctx, "Artifact client not configured, skipping registration for consumer [%+v]", id)
return
}
ac := &artifact.ArtifactConsumer{
EntityId: id,
Inputs: &pm,
}
_, err := a.client.RegisterConsumer(ctx, &artifact.RegisterConsumerRequest{
Consumers: []*artifact.ArtifactConsumer{ac},
})
if err != nil {
logger.Errorf(ctx, "Failed to register artifact consumer for entity [%+v] with err: %v", id, err)
}
logger.Debugf(ctx, "Registered artifact consumer [%+v]", id)
}

func (a *ArtifactRegistry) GetClient() artifact.ArtifactRegistryClient {
return a.client
}

func NewArtifactRegistry(ctx context.Context, config *Config, opts ...grpc.DialOption) ArtifactRegistry {
return ArtifactRegistry{
client: InitializeArtifactClient(ctx, config, opts...),
}
}
15 changes: 7 additions & 8 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package impl
import (
"context"
"fmt"
"github.com/flyteorg/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"regexp"
"strconv"
Expand Down Expand Up @@ -103,7 +104,7 @@ type ExecutionManager struct {
cloudEventPublisher notificationInterfaces.Publisher
dbEventWriter eventWriter.WorkflowExecutionEventWriter
pluginRegistry *plugins.Registry
artifactClient *artifact.ArtifactRegistryClient
artifactRegistry artifacts.ArtifactRegistry
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -523,7 +524,6 @@ func (m *ExecutionManager) launchSingleTaskExecution(
requestSpec.Metadata = &admin.ExecutionMetadata{}
}
requestSpec.Metadata.Principal = getUser(ctx)
//requestSpec.Metadata.ArtifactIds = resolvedArtifactMap

// Get the node execution (if any) that launched this execution
var parentNodeExecutionID uint
Expand Down Expand Up @@ -887,7 +887,6 @@ func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inp
return nil, artifactIDs, nil
}
outputs := map[string]*core.Parameter{}
client := *m.artifactClient

for k, v := range inputs.Parameters {
if inputsForQueryTemplating != nil {
Expand All @@ -902,7 +901,7 @@ func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inp
}
if v.GetArtifactQuery() != nil {
// This case handles when an Artifact query is specified as a default value.
if m.artifactClient == nil {
if m.artifactRegistry.GetClient() == nil {
return nil, nil, errors.NewFlyteAdminErrorf(codes.Internal, "artifact client is not initialized, can't resolve queries")
}
filledInQuery, err := m.fillInTemplateArgs(ctx, *v.GetArtifactQuery(), inputsForQueryTemplating, metadata)
Expand All @@ -915,7 +914,7 @@ func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inp
Details: false,
}

resp, err := client.GetArtifact(ctx, req)
resp, err := m.artifactRegistry.GetClient().GetArtifact(ctx, req)
if err != nil {
return nil, nil, err
}
Expand All @@ -935,7 +934,7 @@ func (m *ExecutionManager) ResolveParameterMapArtifacts(ctx context.Context, inp
},
Details: false,
}
resp, err := client.GetArtifact(ctx, req)
resp, err := m.artifactRegistry.GetClient().GetArtifact(ctx, req)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1962,7 +1961,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu
publisher notificationInterfaces.Publisher, urlData dataInterfaces.RemoteURLInterface,
workflowManager interfaces.WorkflowInterface, namedEntityManager interfaces.NamedEntityInterface,
eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher,
eventWriter eventWriter.WorkflowExecutionEventWriter, artifactClient *artifact.ArtifactRegistryClient) interfaces.ExecutionInterface {
eventWriter eventWriter.WorkflowExecutionEventWriter, artifactRegistry artifacts.ArtifactRegistry) interfaces.ExecutionInterface {

queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)
Expand Down Expand Up @@ -1996,7 +1995,7 @@ func NewExecutionManager(db repositoryInterfaces.Repository, pluginRegistry *plu
cloudEventPublisher: cloudEventPublisher,
dbEventWriter: eventWriter,
pluginRegistry: pluginRegistry,
artifactClient: artifactClient,
artifactRegistry: artifactRegistry,
}
}

Expand Down
31 changes: 22 additions & 9 deletions flyteadmin/pkg/manager/impl/launch_plan_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package impl
import (
"bytes"
"context"
"github.com/flyteorg/flyteadmin/pkg/artifacts"
"strconv"

"github.com/flyteorg/flytestdlib/contextutils"
Expand Down Expand Up @@ -37,10 +38,11 @@ type launchPlanMetrics struct {
}

type LaunchPlanManager struct {
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
scheduler scheduleInterfaces.EventScheduler
metrics launchPlanMetrics
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
scheduler scheduleInterfaces.EventScheduler
metrics launchPlanMetrics
artifactRegistry artifacts.ArtifactRegistry
}

func getLaunchPlanContext(ctx context.Context, identifier *core.Identifier) context.Context {
Expand Down Expand Up @@ -114,6 +116,15 @@ func (m *LaunchPlanManager) CreateLaunchPlan(
}
m.metrics.SpecSizeBytes.Observe(float64(len(launchPlanModel.Spec)))
m.metrics.ClosureSizeBytes.Observe(float64(len(launchPlanModel.Closure)))
go func() {
ceCtx := context.TODO()
if launchPlan.Spec.DefaultInputs == nil {
logger.Debugf(ceCtx, "Insufficient fields to submit launchplan interface %v", launchPlan.Id)
return
}
m.artifactRegistry.RegisterArtifactConsumer(ceCtx, launchPlan.Id, *launchPlan.Spec.DefaultInputs)
}()

return &admin.LaunchPlanCreateResponse{}, nil
}

Expand Down Expand Up @@ -552,7 +563,8 @@ func NewLaunchPlanManager(
db repoInterfaces.Repository,
config runtimeInterfaces.Configuration,
scheduler scheduleInterfaces.EventScheduler,
scope promutils.Scope) interfaces.LaunchPlanInterface {
scope promutils.Scope,
artifactRegistry artifacts.ArtifactRegistry) interfaces.LaunchPlanInterface {

metrics := launchPlanMetrics{
Scope: scope,
Expand All @@ -562,9 +574,10 @@ func NewLaunchPlanManager(
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized launch plan closure"),
}
return &LaunchPlanManager{
db: db,
config: config,
scheduler: scheduler,
metrics: metrics,
db: db,
config: config,
scheduler: scheduler,
metrics: metrics,
artifactRegistry: artifactRegistry,
}
}
38 changes: 25 additions & 13 deletions flyteadmin/pkg/manager/impl/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import (

"github.com/golang/protobuf/ptypes"

"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources"
Expand All @@ -29,6 +28,7 @@ import (
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
workflowengine "github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"google.golang.org/grpc/codes"
)

Expand All @@ -39,11 +39,12 @@ type taskMetrics struct {
}

type TaskManager struct {
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
compiler workflowengine.Compiler
metrics taskMetrics
resourceManager interfaces.ResourceInterface
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
compiler workflowengine.Compiler
metrics taskMetrics
resourceManager interfaces.ResourceInterface
artifactRegistry artifacts.ArtifactRegistry
}

func getTaskContext(ctx context.Context, identifier *core.Identifier) context.Context {
Expand Down Expand Up @@ -133,6 +134,14 @@ func (t *TaskManager) CreateTask(
contextWithRuntimeMeta, common.RuntimeVersionKey, finalizedRequest.Spec.Template.Metadata.Runtime.Version)
t.metrics.Registered.Inc(contextWithRuntimeMeta)
}
go func() {
ceCtx := context.TODO()
if finalizedRequest.Spec.Template.Interface == nil {
logger.Debugf(ceCtx, "Task [%+v] has no interface, skipping registration", finalizedRequest.Id)
return
}
t.artifactRegistry.RegisterArtifactProducer(ceCtx, finalizedRequest.Id, *finalizedRequest.Spec.Template.Interface)
}()

return &admin.TaskCreateResponse{}, nil
}
Expand Down Expand Up @@ -266,18 +275,21 @@ func (t *TaskManager) ListUniqueTaskIdentifiers(ctx context.Context, request adm
func NewTaskManager(
db repoInterfaces.Repository,
config runtimeInterfaces.Configuration, compiler workflowengine.Compiler,
scope promutils.Scope) interfaces.TaskInterface {
scope promutils.Scope,
artifactRegistry artifacts.ArtifactRegistry) interfaces.TaskInterface {

metrics := taskMetrics{
Scope: scope,
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized task closure"),
Registered: labeled.NewCounter("num_registered", "count of registered tasks", scope),
}
resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration())
return &TaskManager{
db: db,
config: config,
compiler: compiler,
metrics: metrics,
resourceManager: resourceManager,
db: db,
config: config,
compiler: compiler,
metrics: metrics,
resourceManager: resourceManager,
artifactRegistry: artifactRegistry,
}
}
44 changes: 30 additions & 14 deletions flyteadmin/pkg/manager/impl/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package impl
import (
"bytes"
"context"
"github.com/flyteorg/flyteadmin/pkg/artifacts"
"strconv"
"time"

Expand Down Expand Up @@ -39,12 +40,13 @@ type workflowMetrics struct {
}

type WorkflowManager struct {
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
compiler workflowengineInterfaces.Compiler
storageClient *storage.DataStore
storagePrefix []string
metrics workflowMetrics
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
compiler workflowengineInterfaces.Compiler
storageClient *storage.DataStore
storagePrefix []string
metrics workflowMetrics
artifactRegistry artifacts.ArtifactRegistry
}

func getWorkflowContext(ctx context.Context, identifier *core.Identifier) context.Context {
Expand Down Expand Up @@ -217,6 +219,17 @@ func (w *WorkflowManager) CreateWorkflow(
}
w.metrics.TypedInterfaceSizeBytes.Observe(float64(len(workflowModel.TypedInterface)))

// Send the interface definition to Artifact service, this is so that it can statically pick up one dimension of
// lineage information
go func() {
ceCtx := context.TODO()
if workflowClosure.CompiledWorkflow == nil || workflowClosure.CompiledWorkflow.Primary == nil {
logger.Debugf(ceCtx, "Insufficient fields to submit workflow interface %v", finalizedRequest.Id)
return
}
w.artifactRegistry.RegisterArtifactProducer(ceCtx, finalizedRequest.Id, *workflowClosure.CompiledWorkflow.Primary.Template.Interface)
}()

return &admin.WorkflowCreateResponse{}, nil
}

Expand All @@ -234,7 +247,7 @@ func (w *WorkflowManager) GetWorkflow(ctx context.Context, request admin.ObjectG
return workflow, nil
}

// Returns workflows *without* a populated workflow closure.
// ListWorkflows returns workflows *without* a populated workflow closure.
func (w *WorkflowManager) ListWorkflows(
ctx context.Context, request admin.ResourceListRequest) (*admin.WorkflowList, error) {
// Check required fields
Expand Down Expand Up @@ -350,7 +363,9 @@ func NewWorkflowManager(
compiler workflowengineInterfaces.Compiler,
storageClient *storage.DataStore,
storagePrefix []string,
scope promutils.Scope) interfaces.WorkflowInterface {
scope promutils.Scope,
artifactRegistry artifacts.ArtifactRegistry) interfaces.WorkflowInterface {

metrics := workflowMetrics{
Scope: scope,
CompilationFailures: scope.MustNewCounter(
Expand All @@ -359,11 +374,12 @@ func NewWorkflowManager(
"size in bytes of serialized workflow TypedInterface"),
}
return &WorkflowManager{
db: db,
config: config,
compiler: compiler,
storageClient: storageClient,
storagePrefix: storagePrefix,
metrics: metrics,
db: db,
config: config,
compiler: compiler,
storageClient: storageClient,
storagePrefix: storagePrefix,
metrics: metrics,
artifactRegistry: artifactRegistry,
}
}
Loading

0 comments on commit 462343a

Please sign in to comment.