-
Notifications
You must be signed in to change notification settings - Fork 63
Ensure admin never sets task resource request > limit #126
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,8 @@ import ( | |
"strconv" | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/api/resource" | ||
|
||
"github.com/lyft/flyteadmin/pkg/manager/impl/resources" | ||
|
||
"github.com/golang/protobuf/ptypes" | ||
|
@@ -263,6 +265,41 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier, | |
return resourceEntries | ||
} | ||
|
||
func resolveTaskLimitsAndPlatformRequestDefaults(ctx context.Context, identifier *core.Identifier, | ||
taskResources *core.Resources) { | ||
// We choose the minimum of the platform request defaults or the limit itself for every resource request. | ||
// Otherwise we can find ourselves in confusing scenarios where the injected platform request defaults exceed a | ||
// user-specified limit | ||
resourceLimits := make(map[core.Resources_ResourceName]string) | ||
for _, resourceEntry := range taskResources.Limits { | ||
resourceLimits[resourceEntry.Name] = resourceEntry.Value | ||
} | ||
|
||
finalizedResourceRequests := make([]*core.Resources_ResourceEntry, 0, len(taskResources.Requests)) | ||
for _, resourceEntry := range taskResources.Requests { | ||
value := resourceEntry.Value | ||
quantity := resource.MustParse(resourceEntry.Value) | ||
limitValue, ok := resourceLimits[resourceEntry.Name] | ||
if !ok { | ||
// Unexpected - at this stage both requests and limits should be populated. | ||
logger.Warningf(ctx, "No limit specified for [%v] resource [%s] although request was set", identifier, | ||
resourceEntry.Name) | ||
continue | ||
} | ||
if quantity.Cmp(resource.MustParse(limitValue)) == 1 { | ||
// The quantity is greater than the limit! Course correct below. | ||
logger.Debugf(ctx, "Updating requested value for task [%+v] resource [%s]. Overriding to [%s] from [%s]", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can you add what causes this override to the log so that it's clearer when reading the logs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also I feel the debug level should be warning/info? Not quite sure 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated & upgraded |
||
identifier, resourceEntry.Name, limitValue, value) | ||
value = limitValue | ||
} | ||
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ | ||
Name: resourceEntry.Name, | ||
Value: value, | ||
}) | ||
} | ||
taskResources.Requests = finalizedResourceRequests | ||
} | ||
|
||
// Assumes input contains a compiled task with a valid container resource execConfig. | ||
// | ||
// Note: The system will assign a system-default value for request but for limit it will deduce it from the request | ||
|
@@ -305,6 +342,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co | |
task.Template.GetContainer().Resources.Limits = assignResourcesIfUnset( | ||
ctx, task.Template.Id, createTaskDefaultLimits(ctx, task), task.Template.GetContainer().Resources.Limits, | ||
taskResourceSpec) | ||
resolveTaskLimitsAndPlatformRequestDefaults(ctx, task.Template.Id, task.Template.GetContainer().Resources) | ||
} | ||
|
||
func (m *ExecutionManager) launchSingleTaskExecution( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2464,6 +2464,111 @@ func TestAssignResourcesIfUnset(t *testing.T) { | |
}, assignedResources) | ||
} | ||
|
||
func TestResolveTaskLimitsAndPlatformRequestDefaults(t *testing.T) { | ||
ctx := context.Background() | ||
identifier := &core.Identifier{ | ||
ResourceType: core.ResourceType_TASK, | ||
Project: project, | ||
Domain: domain, | ||
Name: name, | ||
Version: version, | ||
} | ||
t.Run("use_limit", func(t *testing.T) { | ||
resources := &core.Resources{ | ||
Requests: []*core.Resources_ResourceEntry{ | ||
{ | ||
Name: core.Resources_CPU, | ||
Value: "1", | ||
}, | ||
{ | ||
Name: core.Resources_MEMORY, | ||
Value: "2", | ||
}, | ||
}, | ||
Limits: []*core.Resources_ResourceEntry{ | ||
{ | ||
Name: core.Resources_CPU, | ||
Value: "2", | ||
}, | ||
{ | ||
Name: core.Resources_MEMORY, | ||
Value: "1", | ||
}, | ||
}, | ||
} | ||
resolveTaskLimitsAndPlatformRequestDefaults(ctx, identifier, resources) | ||
assert.True(t, proto.Equal(&core.Resources{ | ||
Requests: []*core.Resources_ResourceEntry{ | ||
{ | ||
Name: core.Resources_CPU, | ||
Value: "1", | ||
}, | ||
{ | ||
Name: core.Resources_MEMORY, | ||
Value: "1", | ||
}, | ||
}, | ||
Limits: []*core.Resources_ResourceEntry{ | ||
{ | ||
Name: core.Resources_CPU, | ||
Value: "2", | ||
}, | ||
{ | ||
Name: core.Resources_MEMORY, | ||
Value: "1", | ||
}, | ||
}, | ||
}, resources)) | ||
}) | ||
t.Run("nothing_to_override", func(t *testing.T) { | ||
resources := &core.Resources{ | ||
Requests: []*core.Resources_ResourceEntry{ | ||
{ | ||
Name: core.Resources_CPU, | ||
Value: "2", | ||
}, | ||
{ | ||
Name: core.Resources_MEMORY, | ||
Value: "1", | ||
}, | ||
}, | ||
Limits: []*core.Resources_ResourceEntry{ | ||
{ | ||
Name: core.Resources_CPU, | ||
Value: "2", | ||
}, | ||
{ | ||
Name: core.Resources_MEMORY, | ||
Value: "1.5", | ||
}, | ||
}, | ||
} | ||
resolveTaskLimitsAndPlatformRequestDefaults(ctx, identifier, resources) | ||
assert.True(t, proto.Equal(&core.Resources{ | ||
Requests: []*core.Resources_ResourceEntry{ | ||
{ | ||
Name: core.Resources_CPU, | ||
Value: "2", | ||
}, | ||
{ | ||
Name: core.Resources_MEMORY, | ||
Value: "1", | ||
}, | ||
}, | ||
Limits: []*core.Resources_ResourceEntry{ | ||
{ | ||
Name: core.Resources_CPU, | ||
Value: "2", | ||
}, | ||
{ | ||
Name: core.Resources_MEMORY, | ||
Value: "1.5", | ||
}, | ||
}, | ||
}, resources)) | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a test that tests the logic here? https://github.com/lyft/flyteadmin/pull/126/files#diff-fc047e54b9dd82ca7c89ac9b32cb07b3R282-R288 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. at the stage where the method is called all values will have already been substituted with the application defaults. (https://github.com/lyft/flyteadmin/pull/126/files#diff-fc047e54b9dd82ca7c89ac9b32cb07b3R342) i only added that check as a defensive safeguard There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh interesting. So the control flow should never enter this block? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unless the user is specifying a resource we don't provide defaults for. but in that case we don't find ourselves in the original conundrum where the substituted request > limit since there will be no request default value which leads to a permissible situation for scheduling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From a unit-test point of view I think that it would still be nice to have a test for it since we don't know if in the future this function would be invoked through other code paths where the default limits were not pre-filled. But as you said it is not critical here. |
||
} | ||
|
||
func TestSetDefaults(t *testing.T) { | ||
task := &core.CompiledTask{ | ||
Template: &core.TaskTemplate{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the code updating the limits (as the name suggests?) am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, I just struggled with naming. is
considerTaskLimitsAndPlatformRequestDefaults
orreconcileTaskLimitsAndPlatformRequestDefaults
any better? other suggestions welcomedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the limit is not supposed to change though... the request is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay updated