Skip to content

Commit

Permalink
Sizing Calculator: added unit tests and minor restructuring (#1551)
Browse files Browse the repository at this point in the history
* sizing calc: added unit tests and minor restructuring
  • Loading branch information
shaharuk-yb committed Jun 13, 2024
1 parent 0701461 commit 3609c48
Show file tree
Hide file tree
Showing 4 changed files with 943 additions and 25 deletions.
1 change: 1 addition & 0 deletions yb-voyager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
)

require (
github.com/DATA-DOG/go-sqlmock v1.5.2 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/pganalyze/pg_query_go/v5 v5.1.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions yb-voyager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1 h1:oPdPEZFSbl7
github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1/go.mod h1:4qFor3D/HDsvBME35Xy9rwW9DecL+M2sNw1ybjPtwA0=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/GoogleCloudPlatform/cloudsql-proxy v1.33.2/go.mod h1:uqoR4sJc63p7ugW8a/vsEspOsNuehbi7ptS2CHCyOnY=
github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
Expand Down Expand Up @@ -1390,6 +1392,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
Expand Down
57 changes: 32 additions & 25 deletions yb-voyager/src/migassessment/sizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ const (
MAX_TABLETS_PER_TABLE = 256
)

var ExperimentDB *sql.DB

func getExperimentDBPath() string {
return filepath.Join(AssessmentDir, DBS_DIR, EXPERIMENT_DATA_FILENAME)
}
Expand All @@ -117,31 +115,31 @@ var experimentData20240 []byte
func SizingAssessment() error {

log.Infof("loading metadata files for sharding assessment")
sourceTableMetadata, sourceIndexMetadata, _, err := loadSourceMetadata()
sourceTableMetadata, sourceIndexMetadata, _, err := loadSourceMetadata(GetSourceMetadataDBFilePath())
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("failed to load source metadata: %v", err)
return fmt.Errorf("failed to load source metadata: %w", err)
}

err = createConnectionToExperimentData()
experimentDB, err := createConnectionToExperimentData()
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("failed to connect to experiment data: %v", err)
return fmt.Errorf("failed to connect to experiment data: %w", err)
}

colocatedLimits, err := loadColocatedLimit()
colocatedLimits, err := loadColocatedLimit(experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("error fetching the colocated limits: %v", err)
return fmt.Errorf("error fetching the colocated limits: %w", err)
}

shardedLimits, err := loadShardedTableLimits()
shardedLimits, err := loadShardedTableLimits(experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("error fetching the sharded limits: %v", err)
return fmt.Errorf("error fetching the colocated limits: %w", err)
}

shardedThroughput, err := loadShardedThroughput()
shardedThroughput, err := loadShardedThroughput(experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("error fetching the sharded throughput: %v", err)
return fmt.Errorf("error fetching the sharded throughput: %w", err)
Expand Down Expand Up @@ -174,7 +172,7 @@ func SizingAssessment() error {
// calculate time taken for colocated import
importTimeForColocatedObjects, parallelVoyagerJobsColocated, err :=
calculateTimeTakenAndParallelJobsForImport(COLOCATED_LOAD_TIME_TABLE, colocatedObjects,
finalSizingRecommendation.VCPUsPerInstance, finalSizingRecommendation.MemoryPerCore)
finalSizingRecommendation.VCPUsPerInstance, finalSizingRecommendation.MemoryPerCore, experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("calculate time taken for colocated data import: %v", err)
return fmt.Errorf("calculate time taken for colocated data import: %w", err)
Expand All @@ -183,7 +181,7 @@ func SizingAssessment() error {
// calculate time taken for sharded import
importTimeForShardedObjects, parallelVoyagerJobsSharded, err :=
calculateTimeTakenAndParallelJobsForImport(SHARDED_LOAD_TIME_TABLE, shardedObjects,
finalSizingRecommendation.VCPUsPerInstance, finalSizingRecommendation.MemoryPerCore)
finalSizingRecommendation.VCPUsPerInstance, finalSizingRecommendation.MemoryPerCore, experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("calculate time taken for sharded data import: %v", err)
return fmt.Errorf("calculate time taken for sharded data import: %w", err)
Expand Down Expand Up @@ -320,6 +318,17 @@ func findNumNodesNeededBasedOnThroughputRequirement(sourceIndexMetadata []Source
return recommendation
}

/*
findNumNodesNeededBasedOnTabletsRequired calculates the number of nodes needed based on tablets required by each
table and its indexes and updates the recommendation accordingly.
Parameters:
- sourceIndexMetadata: A slice of SourceDBMetadata structs representing source indexes.
- shardedLimits: A slice of ExpDataShardedThroughput structs representing sharded throughput limits.
- recommendation: A map where the key is the number of vCPUs per instance and the value is an IntermediateRecommendation struct.
Returns:
- An updated map of recommendations with the number of nodes needed.
*/
func findNumNodesNeededBasedOnTabletsRequired(sourceIndexMetadata []SourceDBMetadata,
shardedLimits []ExpDataShardedLimit,
recommendation map[int]IntermediateRecommendation) map[int]IntermediateRecommendation {
Expand Down Expand Up @@ -633,7 +642,7 @@ Returns:
- A slice of ExpDataColocatedLimit structs containing the fetched colocated limits.
- An error if there was any issue during the data retrieval process.
*/
func loadColocatedLimit() ([]ExpDataColocatedLimit, error) {
func loadColocatedLimit(experimentDB *sql.DB) ([]ExpDataColocatedLimit, error) {
var colocatedLimits []ExpDataColocatedLimit
query := fmt.Sprintf(`
SELECT max_colocated_db_size_gb,
Expand All @@ -646,7 +655,7 @@ func loadColocatedLimit() ([]ExpDataColocatedLimit, error) {
FROM %v
ORDER BY num_cores DESC
`, COLOCATED_LIMITS_TABLE)
rows, err := ExperimentDB.Query(query)
rows, err := experimentDB.Query(query)
if err != nil {
return nil, fmt.Errorf("cannot fetch data from experiment data table with query [%s]: %w", query, err)
}
Expand Down Expand Up @@ -676,15 +685,15 @@ Returns:
- A slice of ExpDataShardedLimit structs containing the fetched sharded table limits.
- An error if there was any issue during the data retrieval process.
*/
func loadShardedTableLimits() ([]ExpDataShardedLimit, error) {
func loadShardedTableLimits(experimentDB *sql.DB) ([]ExpDataShardedLimit, error) {
// added num_cores >= VCPUPerInstance from colo recommendation as that is the starting point
selectQuery := fmt.Sprintf(`
SELECT num_cores, memory_per_core, num_tables
FROM %s
WHERE dimension LIKE '%%TableLimits-3nodeRF=3%%'
ORDER BY num_cores
`, SHARDED_SIZING_TABLE)
rows, err := ExperimentDB.Query(selectQuery)
rows, err := experimentDB.Query(selectQuery)

if err != nil {
return nil, fmt.Errorf("error while fetching cores info with query [%s]: %w", selectQuery, err)
Expand Down Expand Up @@ -715,7 +724,7 @@ Returns:
- A slice of ExpDataShardedThroughput structs containing the fetched sharded throughput information.
- An error if there was any issue during the data retrieval process.
*/
func loadShardedThroughput() ([]ExpDataShardedThroughput, error) {
func loadShardedThroughput(experimentDB *sql.DB) ([]ExpDataShardedThroughput, error) {
selectQuery := fmt.Sprintf(`
SELECT inserts_per_core,
selects_per_core,
Expand All @@ -727,7 +736,7 @@ func loadShardedThroughput() ([]ExpDataShardedThroughput, error) {
WHERE dimension = 'MaxThroughput'
ORDER BY num_cores DESC;
`, SHARDED_SIZING_TABLE)
rows, err := ExperimentDB.Query(selectQuery)
rows, err := experimentDB.Query(selectQuery)
if err != nil {
return nil, fmt.Errorf("error while fetching throughput info with query [%s]: %w", selectQuery, err)
}
Expand Down Expand Up @@ -761,8 +770,7 @@ Returns:
[]SourceDBMetadata: all index objects from source db
float64: total size of source db
*/
func loadSourceMetadata() ([]SourceDBMetadata, []SourceDBMetadata, float64, error) {
filePath := GetSourceMetadataDBFilePath()
func loadSourceMetadata(filePath string) ([]SourceDBMetadata, []SourceDBMetadata, float64, error) {
SourceMetaDB, err := utils.ConnectToSqliteDatabase(filePath)
if err != nil {
return nil, nil, 0.0, fmt.Errorf("cannot connect to source metadata database: %w", err)
Expand Down Expand Up @@ -811,7 +819,7 @@ Returns:
int64: Total parallel jobs used for import.
*/
func calculateTimeTakenAndParallelJobsForImport(tableName string, dbObjects []SourceDBMetadata,
vCPUPerInstance int, memPerCore int) (float64, int64, error) {
vCPUPerInstance int, memPerCore int, experimentDB *sql.DB) (float64, int64, error) {
// the total size of objects
var size float64 = 0
var timeTakenOfFetchedRow float64
Expand Down Expand Up @@ -841,7 +849,7 @@ func calculateTimeTakenAndParallelJobsForImport(tableName string, dbObjects []So
AND num_cores = ?
LIMIT 1;
`, tableName, tableName, tableName)
row := ExperimentDB.QueryRow(selectQuery, vCPUPerInstance, memPerCore, size, vCPUPerInstance)
row := experimentDB.QueryRow(selectQuery, vCPUPerInstance, memPerCore, size, vCPUPerInstance)

if err := row.Scan(&maxSizeOfFetchedRow, &timeTakenOfFetchedRow, &parallelJobs); err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -986,7 +994,7 @@ func getReasoning(recommendation IntermediateRecommendation, shardedObjects []So
sizeUnitSharded, shardedReads, shardedWrites)
// If colocated objects exist, add sharded objects information as rest of the objects need to be migrated as sharded
if len(colocatedObjects) > 0 {
reasoning += " Rest " + shardedReasoning + " need to be migrated as range partitioned tables"
reasoning += " Rest " + shardedReasoning + "need to be migrated as range partitioned tables"
} else {
reasoning += shardedReasoning + "as sharded."
}
Expand Down Expand Up @@ -1054,17 +1062,16 @@ func getListOfIndexesAlongWithObjects(tableList []SourceDBMetadata,
return indexesAndObject, cumulativeIndexCount
}

func createConnectionToExperimentData() error {
func createConnectionToExperimentData() (*sql.DB, error) {
filePath, err := getExperimentFile()
if err != nil {
return fmt.Errorf("failed to get experiment file: %w", err)
return nil, fmt.Errorf("failed to get experiment file: %w", err)
}
DbConnection, err := utils.ConnectToSqliteDatabase(filePath)
if err != nil {
return fmt.Errorf("failed to connect to experiment data database: %w", err)
return nil, fmt.Errorf("failed to connect to experiment data database: %w", err)
}
ExperimentDB = DbConnection
return nil
return DbConnection, nil
}

func getExperimentFile() (string, error) {
Expand Down
Loading

0 comments on commit 3609c48

Please sign in to comment.