From f4aad128a3f5d1a5ff29dd36f8f5d36b832efd14 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Fri, 6 Aug 2021 07:39:26 +0100 Subject: [PATCH] Fix memory divisors (#76) --- controllers/flinkcluster_converter.go | 4 ++-- controllers/flinkcluster_converter_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index d721bd2f..bf18129d 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -1025,7 +1025,7 @@ func calHeapSize(memSize int64, offHeapMin int64, offHeapRatio int64) int64 { } heapSizeCalculated := memSize - offHeapSize if heapSizeCalculated > 0 { - divisor := resource.MustParse("1M") + divisor := resource.MustParse("1Mi") heapSizeQuantity := resource.NewQuantity(heapSizeCalculated, resource.DecimalSI) heapSizeMB = convertResourceMemoryToInt64(*heapSizeQuantity, divisor) } @@ -1034,7 +1034,7 @@ func calHeapSize(memSize int64, offHeapMin int64, offHeapRatio int64) int64 { func calProcessMemorySize(memSize, ratio int64) int64 { size := int64(math.Ceil(float64((memSize * ratio)) / 100)) - divisor := resource.MustParse("1M") + divisor := resource.MustParse("1Mi") quantity := resource.NewQuantity(size, resource.DecimalSI) return convertResourceMemoryToInt64(*quantity, divisor) } diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index 861bdfbb..9bfae12d 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -950,7 +950,7 @@ jobmanager.rpc.address: flinkjobcluster-sample-jobmanager jobmanager.rpc.port: 6123 query.server.port: 6125 rest.port: 8081 -taskmanager.heap.size: 474m +taskmanager.heap.size: 452m taskmanager.numberOfTaskSlots: 1 taskmanager.rpc.port: 6122 ` @@ -1118,8 +1118,8 @@ func TestCalFlinkHeapSize(t *testing.T) { flinkHeapSize := calFlinkHeapSize(cluster) expectedFlinkHeapSize := map[string]string{ - "jobmanager.heap.size": "474m", // get values calculated with limit - memoryOffHeapMin - "taskmanager.heap.size": "3222m", // get values calculated with limit - limit * memoryOffHeapRatio / 100 + "jobmanager.heap.size": "452m", // get values calculated with limit - memoryOffHeapMin + "taskmanager.heap.size": "3072m", // get values calculated with limit - limit * memoryOffHeapRatio / 100 } assert.Assert(t, len(flinkHeapSize) == 2) assert.DeepEqual( @@ -1185,8 +1185,8 @@ func TestCalFlinkMemoryProcessSize(t *testing.T) { flinkHeapSize := calFlinkMemoryProcessSize(cluster) expectedFlinkHeapSize := map[string]string{ - "jobmanager.memory.process.size": "859m", - "taskmanager.memory.process.size": "3436m", + "jobmanager.memory.process.size": "820m", + "taskmanager.memory.process.size": "3277m", } assert.Assert(t, len(flinkHeapSize) == 2) assert.DeepEqual(