Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2925] Remove internal object from application REST info #999

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions pkg/common/resources/tracked_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
"strings"
"time"

"golang.org/x/exp/maps"

"github.com/apache/yunikorn-core/pkg/locking"
)

Expand All @@ -44,10 +46,6 @@
// NewTrackedResourceFromMap creates NewTrackedResource from the given map.
// Using for Testing purpose only.
func NewTrackedResourceFromMap(m map[string]map[string]Quantity) *TrackedResource {
if m == nil {
return NewTrackedResource()
}

trackedMap := make(map[string]*Resource)
for inst, inner := range m {
trackedMap[inst] = NewResourceFromMap(inner)
Expand All @@ -56,6 +54,9 @@
}

func (tr *TrackedResource) String() string {
if tr == nil {
return "TrackedResource{}"
}
tr.RLock()
defer tr.RUnlock()

Expand Down Expand Up @@ -85,8 +86,7 @@

// AggregateTrackedResource aggregates resource usage to TrackedResourceMap[instType].
// The time the given resource used is the delta between the resource createTime and currentTime.
func (tr *TrackedResource) AggregateTrackedResource(instType string,
resource *Resource, bindTime time.Time) {
func (tr *TrackedResource) AggregateTrackedResource(instType string, resource *Resource, bindTime time.Time) {
if resource == nil {
return
}
Expand All @@ -105,25 +105,37 @@
tr.TrackedResourceMap[instType] = aggregatedResourceTime
}

func EqualsTracked(left, right *TrackedResource) bool {
if left == right {
return true
// EqualsDAO compares the TrackedResource against the DAO map that was created of the resource.
// Test use only
func (tr *TrackedResource) EqualsDAO(right map[string]map[string]int64) bool {
if tr == nil {
return len(right) == 0
}

if left == nil || right == nil {
tr.RLock()
defer tr.RUnlock()
if len(tr.TrackedResourceMap) != len(right) {
return false
}

for k, v := range left.TrackedResourceMap {
inner, ok := right.TrackedResourceMap[k]
if !ok {
for k, v := range tr.TrackedResourceMap {
if inner, ok := right[k]; !ok {
return false
}

if !Equals(v, inner) {
} else if !maps.Equal(v.DAOMap(), inner) {
return false
}
}

return true
}

// DAOMap converts the TrackedResource into a map structure for use in the REST API.
func (tr *TrackedResource) DAOMap() map[string]map[string]int64 {
daoMAP := make(map[string]map[string]int64)
if tr != nil {
tr.RLock()
defer tr.RUnlock()
for k, res := range tr.TrackedResourceMap {
daoMAP[k] = res.DAOMap()
}

Check warning on line 138 in pkg/common/resources/tracked_resources.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/resources/tracked_resources.go#L131-L138

Added lines #L131 - L138 were not covered by tests
}
return daoMAP

Check warning on line 140 in pkg/common/resources/tracked_resources.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/resources/tracked_resources.go#L140

Added line #L140 was not covered by tests
}
121 changes: 60 additions & 61 deletions pkg/common/resources/tracked_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,73 +29,55 @@ import (
"gotest.tools/v3/assert"
)

func CheckLenOfTrackedResource(res *TrackedResource, expected int) (bool, string) {
if got := len(res.TrackedResourceMap); expected == 0 && (res == nil || got != expected) {
return false, fmt.Sprintf("input with empty and nil should be a empty tracked resource: Expected %d, got %d", expected, got)
func CheckTrackedResource(res *TrackedResource, trMap map[string]map[string]Quantity) error {
if len(res.TrackedResourceMap) != len(trMap) {
return fmt.Errorf("input with empty and nil should be a empty tracked resource: Expected %d, got %d", len(trMap), len(res.TrackedResourceMap))
}
if got := len(res.TrackedResourceMap); got != expected {
return false, fmt.Sprintf("Length of tracked resources is wrong: Expected %d, got %d", expected, got)
}
return true, ""
}

func CheckResourceValueOfTrackedResource(res *TrackedResource, expected map[string]map[string]Quantity) (bool, string) {
for instanceType, expected := range expected {
for instanceType, expect := range trMap {
trackedRes := res.TrackedResourceMap[instanceType]
expectedRes := NewResourceFromMap(expected)
expectedRes := NewResourceFromMap(expect)
if !Equals(trackedRes, expectedRes) {
return false, fmt.Sprintf("instance type %s, expected %s, got %s", instanceType, trackedRes, expectedRes)
return fmt.Errorf("instance type %s, expected %s, got %s", instanceType, trackedRes, expectedRes)
}
}
return true, ""
return nil
}

func TestNewTrackedResourceFromMap(t *testing.T) {
type outputs struct {
length int
trackedResources map[string]map[string]Quantity
}
var tests = []struct {
caseName string
input map[string]map[string]Quantity
expected outputs
trMap map[string]map[string]Quantity
}{
{
"nil",
nil,
outputs{0, map[string]map[string]Quantity{}},
map[string]map[string]Quantity{},
},
{
"empty",
map[string]map[string]Quantity{},
outputs{0, map[string]map[string]Quantity{}},
},
map[string]map[string]Quantity{}},
{
"tracked resources of one instance type",
map[string]map[string]Quantity{"instanceType1": {"first": 1}},
outputs{1, map[string]map[string]Quantity{"instanceType1": {"first": 1}}},
map[string]map[string]Quantity{"instanceType1": {"first": 1}},
},
{
"tracked resources of two instance type",
map[string]map[string]Quantity{"instanceType1": {"first": 0}, "instanceType2": {"second": -1}},
outputs{2, map[string]map[string]Quantity{"instanceType1": {"first": 0}, "instanceType2": {"second": -1}}},
map[string]map[string]Quantity{"instanceType1": {"first": 0}, "instanceType2": {"second": -1}},
},
{
"Multiple tracked resources for one instance type",
map[string]map[string]Quantity{"instanceType1": {"first": 1, "second": -2, "third": 3}},
outputs{1, map[string]map[string]Quantity{"instanceType1": {"first": 1, "second": -2, "third": 3}}},
map[string]map[string]Quantity{"instanceType1": {"first": 1, "second": -2, "third": 3}},
},
}
for _, tt := range tests {
t.Run(tt.caseName, func(t *testing.T) {
res := NewTrackedResourceFromMap(tt.input)
if ok, err := CheckLenOfTrackedResource(res, tt.expected.length); !ok {
t.Error(err)
} else {
if ok, err := CheckResourceValueOfTrackedResource(res, tt.expected.trackedResources); !ok {
t.Error(err)
}
}
assert.NilError(t, CheckTrackedResource(res, tt.trMap))
})
}
}
Expand Down Expand Up @@ -132,6 +114,11 @@ func TestTrackedResourceClone(t *testing.T) {
}

// case: tracked resource is nil
defer func() {
if r := recover(); r != nil {
t.Fatal("Clone panic on nil TrackedResource")
}
}()
tr := (*TrackedResource)(nil)
cloned := tr.Clone()
assert.Assert(t, cloned == nil)
Expand Down Expand Up @@ -220,58 +207,61 @@ func TestTrackedResourceAggregateTrackedResource(t *testing.T) {
}

// case: resource is nil
defer func() {
if r := recover(); r != nil {
t.Fatal("Panic on nil map for new TrackedResource")
}
}()
tr := NewTrackedResourceFromMap(nil)
tr.AggregateTrackedResource("instanceType1", nil, time.Now().Add(-time.Minute))
assert.Assert(t, tr.TrackedResourceMap != nil && len(tr.TrackedResourceMap) == 0)
}

func TestEqualsTracked(t *testing.T) {
type inputs struct {
base map[string]map[string]Quantity
compare map[string]map[string]Quantity
}
var tests = []struct {
caseName string
input inputs
base map[string]map[string]Quantity
compare map[string]map[string]int64
expected bool
}{
{"simple cases (nil checks)", inputs{nil, nil}, true},
{"simple cases (nil checks)", inputs{map[string]map[string]Quantity{}, nil}, false},
{"same first and second level keys and different resource value",
inputs{map[string]map[string]Quantity{"first": {"val": 10}}, map[string]map[string]Quantity{"first": {"val": 0}}},
{"nil inputs", nil, nil, true},
{"both empty", map[string]map[string]Quantity{}, map[string]map[string]int64{}, true},
{"empty tracked nil dao", map[string]map[string]Quantity{}, nil, true},
{"empty tracked",
map[string]map[string]Quantity{},
map[string]map[string]int64{"first": {"val": 0}},
false,
},
{"different first-level key, same second-level key, same resource value",
inputs{map[string]map[string]Quantity{"first": {"val": 10}}, map[string]map[string]Quantity{"second": {"val": 10}}},
false},
{"same first-level key, different second-level key, same resource value",
inputs{map[string]map[string]Quantity{"first": {"val": 10}}, map[string]map[string]Quantity{"first": {"value": 10}}},
{"same keys different values",
map[string]map[string]Quantity{"first": {"val": 10}},
map[string]map[string]int64{"first": {"val": 0}},
false,
},
{"different instance type",
map[string]map[string]Quantity{"first": {"val": 10}},
map[string]map[string]int64{"second": {"val": 10}},
false},
{"same first-level key, second has larger sub-level map",
inputs{map[string]map[string]Quantity{"first": {"val": 10}}, map[string]map[string]Quantity{"first": {"val": 10, "sum": 7}}},
{"different resource type",
map[string]map[string]Quantity{"first": {"val": 10}},
map[string]map[string]int64{"first": {"value": 10}},
false},
{"same first-level key, first has larger sub-level map",
inputs{map[string]map[string]Quantity{"first": {"val": 10, "sum": 7}}, map[string]map[string]Quantity{"first": {"val": 10}}},
{"different resource count",
map[string]map[string]Quantity{"first": {"val": 10}},
map[string]map[string]int64{"first": {"val": 10, "sum": 7}},
false},
{"same keys and values",
inputs{map[string]map[string]Quantity{"x": {"val": 10, "sum": 7}}, map[string]map[string]Quantity{"x": {"val": 10, "sum": 7}}},
map[string]map[string]Quantity{"x": {"val": 10, "sum": 7}},
map[string]map[string]int64{"x": {"val": 10, "sum": 7}},
true},
}
for _, tt := range tests {
t.Run(tt.caseName, func(t *testing.T) {
var base, compare *TrackedResource
if tt.input.base != nil {
base = NewTrackedResourceFromMap(tt.input.base)
if tt.base != nil {
base = NewTrackedResourceFromMap(tt.base)
}
if tt.input.compare != nil {
compare = NewTrackedResourceFromMap(tt.input.compare)
}

result := EqualsTracked(base, compare)
assert.Assert(t, result == tt.expected, "Equal result should be %v instead of %v, left %v, right %v", tt.expected, result, base, compare)

result = EqualsTracked(compare, base)
assert.Assert(t, result == tt.expected, "Equal result should be %v instead of %v, left %v, right %v", tt.expected, result, compare, base)
assert.Equal(t, base.EqualsDAO(tt.compare), tt.expected, "Equal result should be %v instead of %v, left %v, right %v", tt.expected, base, compare)
})
}
}
Expand Down Expand Up @@ -311,4 +301,13 @@ func TestTrackedResourceString(t *testing.T) {
})
expected = "TrackedResource{instanceType1:cpu=10,instanceType1:memory=20,instanceType2:memory=15}"
assert.Equal(t, sortTrackedResourceString(expected), sortTrackedResourceString(tr3.String()))

defer func() {
if r := recover(); r != nil {
t.Fatal("String panic on nil TrackedResource")
}
}()
tr := (*TrackedResource)(nil)
str := tr.String()
assert.Equal(t, str, "TrackedResource{}")
}
78 changes: 46 additions & 32 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,29 +124,6 @@
locking.RWMutex
}

func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
sa.RLock()
defer sa.RUnlock()
state := sa.stateMachine.Current()
resourceUsage := sa.usedResource.Clone()
preemptedUsage := sa.preemptedResource.Clone()
placeHolderUsage := sa.placeholderResource.Clone()
appSummary := &ApplicationSummary{
ApplicationID: sa.ApplicationID,
SubmissionTime: sa.SubmissionTime,
StartTime: sa.startTime,
FinishTime: sa.finishedTime,
User: sa.user.User,
Queue: sa.queuePath,
State: state,
RmID: rmID,
ResourceUsage: resourceUsage,
PreemptedResource: preemptedUsage,
PlaceholderResource: placeHolderUsage,
}
return appSummary
}

func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eventHandler handler.EventHandler, rmID string) *Application {
app := &Application{
ApplicationID: siApp.ApplicationID,
Expand Down Expand Up @@ -2131,21 +2108,58 @@
sa.preemptedResource = nil
}

func (sa *Application) CleanupTrackedResource() {
// GetApplicationSummary locked version to get the application summary
// Exposed for test only
func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
sa.RLock()
defer sa.RUnlock()
return sa.getApplicationSummary(rmID)
}

func (sa *Application) getApplicationSummary(rmID string) *ApplicationSummary {
return &ApplicationSummary{
ApplicationID: sa.ApplicationID,
SubmissionTime: sa.SubmissionTime,
StartTime: sa.startTime,
FinishTime: sa.finishedTime,
User: sa.user.User,
Queue: sa.queuePath,
State: sa.stateMachine.Current(),
RmID: rmID,
ResourceUsage: sa.usedResource.Clone(),
PreemptedResource: sa.preemptedResource.Clone(),
PlaceholderResource: sa.placeholderResource.Clone(),
}
}

// LogAppSummary log the summary details for the application if it has run at any point in time.
// The application summary only contains correct data when the application is in the Completed state.
// Logging the data in any other state will show incomplete or inconsistent data.
// After the data is logged the objects are cleaned up to lower overhead of Completed application tracking.
func (sa *Application) LogAppSummary(rmID string) {

Check warning on line 2139 in pkg/scheduler/objects/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/application.go#L2135-L2139

Added lines #L2135 - L2139 were not covered by tests
sa.Lock()
defer sa.Unlock()
if !sa.startTime.IsZero() {

Check warning on line 2142 in pkg/scheduler/objects/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/application.go#L2142

Added line #L2142 was not covered by tests
appSummary := sa.getApplicationSummary(rmID)
appSummary.DoLogging()
}
sa.cleanupTrackedResource()
}

func (sa *Application) LogAppSummary(rmID string) {
if sa.startTime.IsZero() {
return
// GetTrackedDAOMap returns the tracked resources type specified in which as a DAO similar to the normal resources.
func (sa *Application) GetTrackedDAOMap(which string) map[string]map[string]int64 {
sa.RLock()
defer sa.RUnlock()
switch which {
case "usedResource":
return sa.usedResource.DAOMap()
case "preemptedResource":
return sa.preemptedResource.DAOMap()

Check warning on line 2157 in pkg/scheduler/objects/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/application.go#L2149-L2157

Added lines #L2149 - L2157 were not covered by tests
case "placeholderResource":
return sa.placeholderResource.DAOMap()
default:
return map[string]map[string]int64{}
}
appSummary := sa.GetApplicationSummary(rmID)
appSummary.DoLogging()
appSummary.ResourceUsage = nil
appSummary.PreemptedResource = nil
appSummary.PlaceholderResource = nil
}

func (sa *Application) HasPlaceholderAllocation() bool {
Expand Down
Loading