Skip to content

Commit

Permalink
SVR-468: Batch provision execution namespaces and support delete (#246)
Browse files Browse the repository at this point in the history
## Overview
As outlined [here](https://unionai.atlassian.net/wiki/spaces/ENG/pages/655654917/Serverless+User+Execution+Namespace+Provisioning)

This change updates cluster resource controller to

- correctly record already synced namespaces on every round rather than reapplying the same object in some cases
- parallelizes namespaces processing in batches for speed-up
- adds handling for archived projects to delete their associated namespaces and cache successfully deleted values (also in parallelized batches)
- emits a metric on successful completion

## Test Plan
- I ran this with sandbox locally and bootstrapped 100 active projects and set the batch size to 10. Created a few templates locally and verified via logs that they were cached after application and not reapplied
- I archived a project, verified the namespace was deleted appropriately, then activated the project and observed its namespace was correctly reprovisioned
- I updated one of the cluster resource templates for project-quota and verified namespace quotas were indeed updated
- I created a new project and verified its namespace was provisioned correctly

## Rollout Plan (if applicable)
Will not merge until unionai/cloud#7566 is also ready to go


## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed

## Jira Issue
https://unionai.atlassian.net/browse/SVR-499
https://unionai.atlassian.net/browse/SVR-372

## Checklist
* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
katrogan authored May 13, 2024
1 parent cb3942e commit 99e7298
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 66 deletions.
266 changes: 224 additions & 42 deletions flyteadmin/pkg/clusterresource/controller.go

Large diffs are not rendered by default.

16 changes: 9 additions & 7 deletions flyteadmin/pkg/clusterresource/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/md5" // #nosec
"io/ioutil"
"os"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -29,22 +30,23 @@ const domain = "domain-bar"
var testScope = mockScope.NewTestScope()

func TestTemplateAlreadyApplied(t *testing.T) {
ctx := context.TODO()
const namespace = "namespace"
const fileName = "fileName"
testController := controller{
metrics: newMetrics(testScope),
}
checksum1 := md5.Sum([]byte("template1")) // #nosec
checksum2 := md5.Sum([]byte("template2")) // #nosec
assert.False(t, testController.templateAlreadyApplied(namespace, fileName, checksum1))
assert.False(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum1))

testController.appliedTemplates = make(map[string]TemplateChecksums)
testController.setTemplateChecksum(namespace, fileName, checksum1)
assert.True(t, testController.templateAlreadyApplied(namespace, fileName, checksum1))
assert.False(t, testController.templateAlreadyApplied(namespace, fileName, checksum2))
testController.appliedNamespaceTemplateChecksums = sync.Map{}
testController.setTemplateChecksum(ctx, namespace, fileName, checksum1)
assert.True(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum1))
assert.False(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum2))

testController.setTemplateChecksum(namespace, fileName, checksum2)
assert.True(t, testController.templateAlreadyApplied(namespace, fileName, checksum2))
testController.setTemplateChecksum(ctx, namespace, fileName, checksum2)
assert.True(t, testController.templateAlreadyApplied(ctx, namespace, fileName, checksum2))
}

func TestPopulateTemplateValues(t *testing.T) {
Expand Down
31 changes: 23 additions & 8 deletions flyteadmin/pkg/clusterresource/impl/admin_service_data_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,29 @@ func (p serviceAdminProvider) GetClusterResourceAttributes(ctx context.Context,
return nil, NewMissingEntityError("cluster resource attributes")
}

// We want both active and system generated projects
var activeProjectsFilter = fmt.Sprintf("ne(state,%d)", admin.Project_ARCHIVED)

var descCreatedAtSortParam = admin.Sort{
var archivedProjectsFilter = fmt.Sprintf("eq(state,%d)", admin.Project_ARCHIVED)

var descUpdatedAtSortParam = admin.Sort{
Direction: admin.Sort_DESCENDING,
Key: "created_at",
Key: "updated_at",
}

var descCreatedAtSortDBParam, _ = common.NewSortParameter(&descCreatedAtSortParam, models.ProjectColumns)
var descCreatedAtSortDBParam, _ = common.NewSortParameter(&descUpdatedAtSortParam, models.ProjectColumns)

func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
func (p serviceAdminProvider) getProjects(ctx context.Context, useActiveProjectsFilter bool) (*admin.Projects, error) {
projects := make([]*admin.Project, 0)
listReq := &admin.ProjectListRequest{
Limit: 100,
Filters: activeProjectsFilter,
// Prefer to sync projects most newly created to ensure their resources get created first when other resources exist.
SortBy: &descCreatedAtSortParam,
Limit: 100,
// Prefer to sync projects most newly updated to ensure their resources get modified first when other resources exist.
SortBy: &descUpdatedAtSortParam,
}
if useActiveProjectsFilter {
listReq.Filters = activeProjectsFilter
} else {
listReq.Filters = archivedProjectsFilter
}

// Iterate through all pages of projects
Expand All @@ -68,6 +75,14 @@ func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects,
}, nil
}

func (p serviceAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
return p.getProjects(ctx, getActiveProjects)
}

func (p serviceAdminProvider) GetArchivedProjects(ctx context.Context) (*admin.Projects, error) {
return p.getProjects(ctx, getArchivedProjects)
}

func NewAdminServiceDataProvider(
adminClient service.AdminServiceClient) interfaces.FlyteAdminDataProvider {
return &serviceAdminProvider{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func TestServiceGetProjects(t *testing.T) {
t.Run("happy case", func(t *testing.T) {
mockAdmin := mocks.AdminServiceClient{}
mockAdmin.OnListProjectsMatch(ctx, mock.MatchedBy(func(req *admin.ProjectListRequest) bool {
return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "created_at"
res := req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "updated_at"
return res
})).Return(&admin.Projects{
Projects: []*admin.Project{
{
Expand All @@ -110,7 +111,7 @@ func TestServiceGetProjects(t *testing.T) {
t.Run("admin error", func(t *testing.T) {
mockAdmin := mocks.AdminServiceClient{}
mockAdmin.OnListProjectsMatch(ctx, mock.MatchedBy(func(req *admin.ProjectListRequest) bool {
return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "created_at"
return req.Limit == 100 && req.Filters == "ne(state,1)" && req.SortBy.Key == "updated_at"
})).Return(nil, errFoo)
provider := serviceAdminProvider{
adminClient: &mockAdmin,
Expand Down
29 changes: 25 additions & 4 deletions flyteadmin/pkg/clusterresource/impl/db_admin_data_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
)

const (
stateColumn = "state"
)

// Implementation of an interfaces.FlyteAdminDataProvider which fetches data directly from the provided database connection.
type dbAdminProvider struct {
db repositoryInterfaces.Repository
Expand Down Expand Up @@ -47,11 +51,20 @@ func (p dbAdminProvider) getDomains() []*admin.Domain {
return domains
}

func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
filter, err := common.NewSingleValueFilter(common.Project, common.NotEqual, "state", int32(admin.Project_ARCHIVED))
if err != nil {
return nil, err
func (p dbAdminProvider) getProjects(ctx context.Context, useActiveProjectsFilter bool) (projectsList *admin.Projects, err error) {
var filter common.InlineFilter
if useActiveProjectsFilter {
filter, err = common.NewSingleValueFilter(common.Project, common.NotEqual, stateColumn, int32(admin.Project_ARCHIVED))
if err != nil {
return nil, err
}
} else {
filter, err = common.NewSingleValueFilter(common.Project, common.Equal, stateColumn, int32(admin.Project_ARCHIVED))
if err != nil {
return nil, err
}
}

projectModels, err := p.db.ProjectRepo().List(ctx, repositoryInterfaces.ListResourceInput{
SortParameter: descCreatedAtSortDBParam,
InlineFilters: []common.InlineFilter{filter},
Expand All @@ -65,6 +78,14 @@ func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, erro
}, nil
}

func (p dbAdminProvider) GetProjects(ctx context.Context) (*admin.Projects, error) {
return p.getProjects(ctx, getActiveProjects)
}

func (p dbAdminProvider) GetArchivedProjects(ctx context.Context) (*admin.Projects, error) {
return p.getProjects(ctx, getArchivedProjects)
}

func NewDatabaseAdminDataProvider(db repositoryInterfaces.Repository, config runtimeInterfaces.Configuration, resourceManager managerInterfaces.ResourceInterface) interfaces.FlyteAdminDataProvider {
return &dbAdminProvider{
db: db,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestGetProjects(t *testing.T) {
mockRepo.(*repoMocks.MockRepository).ProjectRepoIface = &repoMocks.MockProjectRepo{
ListProjectsFunction: func(ctx context.Context, input repoInterfaces.ListResourceInput) ([]models.Project, error) {
assert.Len(t, input.InlineFilters, 1)
assert.Equal(t, input.SortParameter.GetGormOrderExpr(), "created_at desc")
assert.Equal(t, input.SortParameter.GetGormOrderExpr(), "updated_at desc")
return []models.Project{
{
Identifier: "flytesnacks",
Expand Down
5 changes: 5 additions & 0 deletions flyteadmin/pkg/clusterresource/impl/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
)

const (
getActiveProjects = true
getArchivedProjects = false
)

func NewMissingEntityError(entity string) error {
return errors.NewFlyteAdminErrorf(codes.NotFound, "Failed to find [%s]", entity)
}
1 change: 1 addition & 0 deletions flyteadmin/pkg/clusterresource/interfaces/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ import (
type FlyteAdminDataProvider interface {
GetClusterResourceAttributes(ctx context.Context, org, project, domain string) (*admin.ClusterResourceAttributes, error)
GetProjects(ctx context.Context) (*admin.Projects, error)
GetArchivedProjects(ctx context.Context) (*admin.Projects, error)
}
41 changes: 41 additions & 0 deletions flyteadmin/pkg/clusterresource/mocks/flyte_admin_data_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions flyteadmin/pkg/clusterresource/sync_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type ResourceSyncStats struct {
Updated int
AlreadyThere int
Errored int
Deleted int
}

// Add adds the values of the other ResourceSyncStats to this one
Expand All @@ -14,4 +15,5 @@ func (m *ResourceSyncStats) Add(other ResourceSyncStats) {
m.Updated += other.Updated
m.AlreadyThere += other.AlreadyThere
m.Errored += other.Errored
m.Deleted += other.Deleted
}
7 changes: 7 additions & 0 deletions flyteadmin/pkg/runtime/cluster_resource_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ var clusterResourceConfig = config.MustRegisterSection(clusterResourceKey, &inte
Duration: time.Minute,
},
CustomData: make(map[interfaces.DomainName]interfaces.TemplateData),
UnionProjectSyncConfig: interfaces.UnionProjectSyncConfig{
BatchSize: 10,
},
})

// Implementation of an interfaces.ClusterResourceConfiguration
Expand All @@ -41,6 +44,10 @@ func (p *ClusterResourceConfigurationProvider) IsStandaloneDeployment() bool {
return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).StandaloneDeployment
}

func (p *ClusterResourceConfigurationProvider) GetUnionProjectSyncConfig() interfaces.UnionProjectSyncConfig {
return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).UnionProjectSyncConfig
}

func NewClusterResourceConfigurationProvider() interfaces.ClusterResourceConfiguration {
return &ClusterResourceConfigurationProvider{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type DomainName = string

type TemplateData = map[string]DataSource

type UnionProjectSyncConfig struct {
CleanupNamespace bool `json:"cleanupNamespace" pflag:", Whether to clean up resources associated with archived projects"`
BatchSize int `json:"batchSize" pflag:", How many projects to process in parallel (use 0 for serial processing)"`
}

type ClusterResourceConfig struct {
TemplatePath string `json:"templatePath"`
// TemplateData maps template keys e.g. my_super_secret_password to a data source
Expand All @@ -42,8 +47,9 @@ type ClusterResourceConfig struct {
foo:
value: "baz"
*/
CustomData map[DomainName]TemplateData `json:"customData"`
StandaloneDeployment bool `json:"standaloneDeployment" pflag:", Whether the cluster resource sync is running in a standalone deployment and should call flyteadmin service endpoints"`
CustomData map[DomainName]TemplateData `json:"customData"`
StandaloneDeployment bool `json:"standaloneDeployment" pflag:", Whether the cluster resource sync is running in a standalone deployment and should call flyteadmin service endpoints"`
UnionProjectSyncConfig UnionProjectSyncConfig `json:"unionProjectSyncConfig"`
}

type ClusterResourceConfiguration interface {
Expand All @@ -52,4 +58,5 @@ type ClusterResourceConfiguration interface {
GetRefreshInterval() time.Duration
GetCustomTemplateData() map[DomainName]TemplateData
IsStandaloneDeployment() bool
GetUnionProjectSyncConfig() UnionProjectSyncConfig
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func (c MockClusterResourceConfiguration) IsStandaloneDeployment() bool {
return c.StandaloneDeployment
}

func (c MockClusterResourceConfiguration) GetArchiveProjectConfig() interfaces.UnionProjectSyncConfig {
return interfaces.UnionProjectSyncConfig{}
}

func (c MockClusterResourceConfiguration) GetUnionProjectSyncConfig() interfaces.UnionProjectSyncConfig {
return interfaces.UnionProjectSyncConfig{}
}

func NewMockClusterResourceConfiguration() interfaces.ClusterResourceConfiguration {
return &MockClusterResourceConfiguration{}
}

0 comments on commit 99e7298

Please sign in to comment.