Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Also consider memory usage in adaptive parallelism #1759

Merged
merged 8 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 74 additions & 14 deletions yb-voyager/src/adaptiveparallelism/adaptive_parallelism.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ const (

var MAX_CPU_THRESHOLD int
var ADAPTIVE_PARALLELISM_FREQUENCY_SECONDS int
var MIN_AVAILABLE_MEMORY_THRESHOLD int

func init() {
MAX_CPU_THRESHOLD = utils.GetEnvAsInt("MAX_CPU_THRESHOLD", 70)
ADAPTIVE_PARALLELISM_FREQUENCY_SECONDS = utils.GetEnvAsInt("ADAPTIVE_PARALLELISM_FREQUENCY_SECONDS", 10)
MIN_AVAILABLE_MEMORY_THRESHOLD = utils.GetEnvAsInt("MIN_AVAILABLE_MEMORY_THRESHOLD", 10)
}

type TargetYugabyteDBWithConnectionPool interface {
Expand Down Expand Up @@ -69,29 +71,25 @@ func fetchClusterMetricsAndUpdateParallelism(yb TargetYugabyteDBWithConnectionPo
return fmt.Errorf("getting cluster metrics: %w", err)
}

// get max CPU
// Note that right now, voyager ingests data into the target in parallel,
// but one table at a time. Therefore, in cases where there is a single tablet for a table,
// either due to pre-split or colocated table, it is possible that the load on the cluster
// will be uneven. Nevertheless, we still want to ensure that the cluster is not overloaded,
// therefore we use the max CPU usage across all nodes in the cluster.
maxCpuUsage, err := getMaxCpuUsageInCluster(clusterMetrics)
cpuLoadHigh, err := isCpuLoadHigh(clusterMetrics)
if err != nil {
return fmt.Errorf("getting max cpu usage in cluster: %w", err)
return fmt.Errorf("checking if cpu load is high: %w", err)
}
if maxCpuUsage < 0 {
log.Warnf("adaptive: Ignoring update as max cpu usage in cluster is negative: %d", maxCpuUsage)
memLoadHigh, err := isMemoryLoadHigh(clusterMetrics)
if err != nil {
return fmt.Errorf("checking if memory load is high: %w", err)
}
log.Infof("adaptive: max cpu usage in cluster = %d", maxCpuUsage)

if maxCpuUsage > MAX_CPU_THRESHOLD {
log.Infof("adaptive: found CPU usage = %d > %d, reducing parallelism to %d", maxCpuUsage, MAX_CPU_THRESHOLD, yb.GetNumConnectionsInPool()-1)
if cpuLoadHigh || memLoadHigh {
log.Infof("adaptive: cpuLoadHigh=%t, memLoadHigh=%t, reducing parallelism to %d",
cpuLoadHigh, memLoadHigh, yb.GetNumConnectionsInPool()-1)
err = yb.UpdateNumConnectionsInPool(-1)
if err != nil {
return fmt.Errorf("updating parallelism with -1: %w", err)
}
} else {
log.Infof("adaptive: found CPU usage = %d <= %d, increasing parallelism to %d", maxCpuUsage, MAX_CPU_THRESHOLD, yb.GetNumConnectionsInPool()+1)
log.Infof("adaptive: cpuLoadHigh=%t, memLoadHigh=%t, increasing parallelism to %d",
cpuLoadHigh, memLoadHigh, yb.GetNumConnectionsInPool()+1)
err := yb.UpdateNumConnectionsInPool(1)
if err != nil {
return fmt.Errorf("updating parallelism with +1 : %w", err)
Expand All @@ -100,6 +98,21 @@ func fetchClusterMetricsAndUpdateParallelism(yb TargetYugabyteDBWithConnectionPo
return nil
}

func isCpuLoadHigh(clusterMetrics map[string]tgtdb.NodeMetrics) (bool, error) {
// get max CPU
// Note that right now, voyager ingests data into the target in parallel,
// but one table at a time. Therefore, in cases where there is a single tablet for a table,
// either due to pre-split or colocated table, it is possible that the load on the cluster
// will be uneven. Nevertheless, we still want to ensure that the cluster is not overloaded,
// therefore we use the max CPU usage across all nodes in the cluster.
maxCpuUsagePct, err := getMaxCpuUsageInCluster(clusterMetrics)
if err != nil {
return false, fmt.Errorf("getting max cpu usage in cluster: %w", err)
}
log.Infof("adaptive: max cpu usage in cluster = %d, max cpu threhsold = %d", maxCpuUsagePct, MAX_CPU_THRESHOLD)
return maxCpuUsagePct > MAX_CPU_THRESHOLD, nil
}

func getMaxCpuUsageInCluster(clusterMetrics map[string]tgtdb.NodeMetrics) (int, error) {
maxCpuPct := -1
for _, nodeMetrics := range clusterMetrics {
Expand All @@ -120,3 +133,50 @@ func getMaxCpuUsageInCluster(clusterMetrics map[string]tgtdb.NodeMetrics) (int,
}
return maxCpuPct, nil
}

/*
Memory load is considered to be high in the following scenarios
- Available memory of any node is less than 10% (MIN_AVAILABLE_MEMORY_THRESHOLD) of it's total memory
- tserver root memory consumption of any node has breached it's soft limit.
*/
func isMemoryLoadHigh(clusterMetrics map[string]tgtdb.NodeMetrics) (bool, error) {
minMemoryAvailablePct := 100
isTserverRootMemorySoftLimitBreached := false
for _, nodeMetrics := range clusterMetrics {
if nodeMetrics.Status != "OK" {
continue
}
// check if tserver root memory soft limit is breached
tserverRootMemoryConsumption, err := strconv.ParseInt(nodeMetrics.Metrics["tserver_root_memory_consumption"], 10, 64)
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, fmt.Errorf("parsing tserver root memory consumption as int: %w", err)
}
tserverRootMemorySoftLimit, err := strconv.ParseInt(nodeMetrics.Metrics["tserver_root_memory_soft_limit"], 10, 64)
if err != nil {
return false, fmt.Errorf("parsing tserver root memory soft limit as int: %w", err)
}
if tserverRootMemoryConsumption > tserverRootMemorySoftLimit {
isTserverRootMemorySoftLimitBreached = true
break
}

// check if memory available is low
memoryAvailable, err := strconv.ParseInt(nodeMetrics.Metrics["memory_available"], 10, 64)
if err != nil {
return false, fmt.Errorf("parsing memory available as int: %w", err)
}
memoryTotal, err := strconv.ParseInt(nodeMetrics.Metrics["memory_total"], 10, 64)
if err != nil {
return false, fmt.Errorf("parsing memory total as int: %w", err)
}
if memoryAvailable == 0 || memoryTotal == 0 {
// invalid values
// on macos memory available is not available
continue
}
memoryAvailablePct := int((memoryAvailable * 100) / memoryTotal)
minMemoryAvailablePct = min(minMemoryAvailablePct, memoryAvailablePct)
makalaaneesh marked this conversation as resolved.
Show resolved Hide resolved
}

return minMemoryAvailablePct < MIN_AVAILABLE_MEMORY_THRESHOLD || isTserverRootMemorySoftLimitBreached, nil
}
91 changes: 91 additions & 0 deletions yb-voyager/src/adaptiveparallelism/adaptive_parallelism_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,44 @@ type dummyTargetYugabyteDB struct {
cpuUsageSys1 float64
cpuUsageUser2 float64
cpuUsageSys2 float64

memAvailable1 int
memTotal1 int
tserverRootMemConsumption1 int
tserverRootMemSoftLimit1 int

memAvailable2 int
memTotal2 int
tserverRootMemConsumption2 int
tserverRootMemSoftLimit2 int
Comment on lines +34 to +42
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q. Is all this info also available in the dbside function you have implemented?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, yugabyte/yugabyte-db@872b59e .
Also added a comment in code with this sample output

}

func (d *dummyTargetYugabyteDB) IsAdaptiveParallelismSupported() bool {
return true
}

/*
{

"memory_free": "2934779904",
"memory_total": "8054566912",
"cpu_usage_user": "0.010204",
"cpu_usage_system": "0.010204",
"memory_available": "7280869376",
"tserver_root_memory_limit": "3866192117",
"tserver_root_memory_soft_limit": "3286263299",
"tserver_root_memory_consumption": "40091648"
}
*/
func (d *dummyTargetYugabyteDB) GetClusterMetrics() (map[string]tgtdb.NodeMetrics, error) {
result := make(map[string]tgtdb.NodeMetrics)
metrics1 := make(map[string]string)
metrics1["cpu_usage_user"] = strconv.FormatFloat(d.cpuUsageUser1, 'f', -1, 64)
metrics1["cpu_usage_system"] = strconv.FormatFloat(d.cpuUsageSys1, 'f', -1, 64)
metrics1["memory_available"] = strconv.Itoa(d.memAvailable1)
metrics1["memory_total"] = strconv.Itoa(d.memTotal1)
metrics1["tserver_root_memory_consumption"] = strconv.Itoa(d.tserverRootMemConsumption1)
metrics1["tserver_root_memory_soft_limit"] = strconv.Itoa(d.tserverRootMemSoftLimit1)

result["node1"] = tgtdb.NodeMetrics{
Metrics: metrics1,
Expand All @@ -52,6 +79,10 @@ func (d *dummyTargetYugabyteDB) GetClusterMetrics() (map[string]tgtdb.NodeMetric
metrics2 := make(map[string]string)
metrics2["cpu_usage_user"] = strconv.FormatFloat(d.cpuUsageUser2, 'f', -1, 64)
metrics2["cpu_usage_system"] = strconv.FormatFloat(d.cpuUsageSys2, 'f', -1, 64)
metrics2["memory_available"] = strconv.Itoa(d.memAvailable2)
metrics2["memory_total"] = strconv.Itoa(d.memTotal2)
metrics2["tserver_root_memory_consumption"] = strconv.Itoa(d.tserverRootMemConsumption2)
metrics2["tserver_root_memory_soft_limit"] = strconv.Itoa(d.tserverRootMemSoftLimit2)

result["node2"] = tgtdb.NodeMetrics{
Metrics: metrics2,
Expand Down Expand Up @@ -99,6 +130,16 @@ func TestIncreaseParallelism(t *testing.T) {
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.5,
cpuUsageSys2: 0.1,

memAvailable1: 7280869376,
memTotal1: 8054566912,
tserverRootMemConsumption1: 40091648,
tserverRootMemSoftLimit1: 3286263299,

memAvailable2: 7280869376,
memTotal2: 8054566912,
tserverRootMemConsumption2: 40091648,
tserverRootMemSoftLimit2: 3286263299,
}

err := fetchClusterMetricsAndUpdateParallelism(yb)
Expand All @@ -120,3 +161,53 @@ func TestDecreaseParallelismBasedOnCpu(t *testing.T) {
assert.NoErrorf(t, err, "failed to fetch cluster metrics and update parallelism")
assert.Equal(t, 2, yb.GetNumConnectionsInPool())
}

func TestDecreaseInParallelismBecauseOfLowAvailableMemory(t *testing.T) {
yb := &dummyTargetYugabyteDB{
size: 3,
maxSize: 6,
cpuUsageUser1: 0.5,
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.5,
cpuUsageSys2: 0.1,

memAvailable1: 705456691, // less than 10% of memTotal1
memTotal1: 8054566912,
tserverRootMemConsumption1: 40091648,
tserverRootMemSoftLimit1: 3286263299,

memAvailable2: 7280869376,
memTotal2: 8054566912,
tserverRootMemConsumption2: 40091648,
tserverRootMemSoftLimit2: 3286263299,
}

err := fetchClusterMetricsAndUpdateParallelism(yb)
assert.NoErrorf(t, err, "failed to fetch cluster metrics and update parallelism")
assert.Equal(t, 2, yb.GetNumConnectionsInPool())
}

func TestDecreaseInParallelismBecauseofTserverRootMemoryConsumptionSoftLimitBreached(t *testing.T) {
yb := &dummyTargetYugabyteDB{
size: 3,
maxSize: 6,
cpuUsageUser1: 0.5,
cpuUsageSys1: 0.1,
cpuUsageUser2: 0.5,
cpuUsageSys2: 0.1,

memAvailable1: 7280869376,
memTotal1: 8054566912,
tserverRootMemConsumption1: 40091648,
tserverRootMemSoftLimit1: 3286263299,

memAvailable2: 7280869376,
memTotal2: 8054566912,
tserverRootMemConsumption2: 3286263300, // breaches tserverRootMemSoftLimit2
tserverRootMemSoftLimit2: 3286263299,
}

err := fetchClusterMetricsAndUpdateParallelism(yb)
assert.NoErrorf(t, err, "failed to fetch cluster metrics and update parallelism")
assert.Equal(t, 2, yb.GetNumConnectionsInPool())
}
Binary file modified yb-voyager/src/srcdb/data/gather-assessment-metadata.tar.gz
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any of the metadata scripts are not modified so the tar should remain unchanged.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got committed because of the commit hook :) I've merged main and re-generated the tar, so should be fine

Binary file not shown.
Loading