Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sizing Calculator: added unit tests and minor restructuring #1551

Merged
merged 16 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yb-voyager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
)

require (
github.com/DATA-DOG/go-sqlmock v1.5.2 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/pganalyze/pg_query_go/v5 v5.1.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions yb-voyager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1 h1:oPdPEZFSbl7
github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1/go.mod h1:4qFor3D/HDsvBME35Xy9rwW9DecL+M2sNw1ybjPtwA0=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/GoogleCloudPlatform/cloudsql-proxy v1.33.2/go.mod h1:uqoR4sJc63p7ugW8a/vsEspOsNuehbi7ptS2CHCyOnY=
github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
Expand Down Expand Up @@ -1390,6 +1392,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
Expand Down
57 changes: 32 additions & 25 deletions yb-voyager/src/migassessment/sizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ const (
MAX_TABLETS_PER_TABLE = 256
)

var ExperimentDB *sql.DB

func getExperimentDBPath() string {
return filepath.Join(AssessmentDir, DBS_DIR, EXPERIMENT_DATA_FILENAME)
}
Expand All @@ -117,31 +115,31 @@ var experimentData20240 []byte
func SizingAssessment() error {

log.Infof("loading metadata files for sharding assessment")
sourceTableMetadata, sourceIndexMetadata, _, err := loadSourceMetadata()
sourceTableMetadata, sourceIndexMetadata, _, err := loadSourceMetadata(GetSourceMetadataDBFilePath())
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("failed to load source metadata: %v", err)
return fmt.Errorf("failed to load source metadata: %w", err)
}

err = createConnectionToExperimentData()
experimentDB, err := createConnectionToExperimentData()
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("failed to connect to experiment data: %v", err)
return fmt.Errorf("failed to connect to experiment data: %w", err)
}

colocatedLimits, err := loadColocatedLimit()
colocatedLimits, err := loadColocatedLimit(experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("error fetching the colocated limits: %v", err)
return fmt.Errorf("error fetching the colocated limits: %w", err)
}

shardedLimits, err := loadShardedTableLimits()
shardedLimits, err := loadShardedTableLimits(experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("error fetching the sharded limits: %v", err)
return fmt.Errorf("error fetching the colocated limits: %w", err)
}

shardedThroughput, err := loadShardedThroughput()
shardedThroughput, err := loadShardedThroughput(experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("error fetching the sharded throughput: %v", err)
return fmt.Errorf("error fetching the sharded throughput: %w", err)
Expand Down Expand Up @@ -174,7 +172,7 @@ func SizingAssessment() error {
// calculate time taken for colocated import
importTimeForColocatedObjects, parallelVoyagerJobsColocated, err :=
calculateTimeTakenAndParallelJobsForImport(COLOCATED_LOAD_TIME_TABLE, colocatedObjects,
finalSizingRecommendation.VCPUsPerInstance, finalSizingRecommendation.MemoryPerCore)
finalSizingRecommendation.VCPUsPerInstance, finalSizingRecommendation.MemoryPerCore, experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("calculate time taken for colocated data import: %v", err)
return fmt.Errorf("calculate time taken for colocated data import: %w", err)
Expand All @@ -183,7 +181,7 @@ func SizingAssessment() error {
// calculate time taken for sharded import
importTimeForShardedObjects, parallelVoyagerJobsSharded, err :=
calculateTimeTakenAndParallelJobsForImport(SHARDED_LOAD_TIME_TABLE, shardedObjects,
finalSizingRecommendation.VCPUsPerInstance, finalSizingRecommendation.MemoryPerCore)
finalSizingRecommendation.VCPUsPerInstance, finalSizingRecommendation.MemoryPerCore, experimentDB)
if err != nil {
SizingReport.FailureReasoning = fmt.Sprintf("calculate time taken for sharded data import: %v", err)
return fmt.Errorf("calculate time taken for sharded data import: %w", err)
Expand Down Expand Up @@ -318,6 +316,17 @@ func findNumNodesNeededBasedOnThroughputRequirement(sourceIndexMetadata []Source
return recommendation
}

/*
findNumNodesNeededBasedOnTabletsRequired calculates the number of nodes needed based on tablets required by each
table and its indexes and updates the recommendation accordingly.
Parameters:
- sourceIndexMetadata: A slice of SourceDBMetadata structs representing source indexes.
- shardedLimits: A slice of ExpDataShardedThroughput structs representing sharded throughput limits.
- recommendation: A map where the key is the number of vCPUs per instance and the value is an IntermediateRecommendation struct.

Returns:
- An updated map of recommendations with the number of nodes needed.
*/
func findNumNodesNeededBasedOnTabletsRequired(sourceIndexMetadata []SourceDBMetadata,
shardedLimits []ExpDataShardedLimit,
recommendation map[int]IntermediateRecommendation) map[int]IntermediateRecommendation {
Expand Down Expand Up @@ -631,7 +640,7 @@ Returns:
- A slice of ExpDataColocatedLimit structs containing the fetched colocated limits.
- An error if there was any issue during the data retrieval process.
*/
func loadColocatedLimit() ([]ExpDataColocatedLimit, error) {
func loadColocatedLimit(experimentDB *sql.DB) ([]ExpDataColocatedLimit, error) {
var colocatedLimits []ExpDataColocatedLimit
query := fmt.Sprintf(`
SELECT max_colocated_db_size_gb,
Expand All @@ -644,7 +653,7 @@ func loadColocatedLimit() ([]ExpDataColocatedLimit, error) {
FROM %v
ORDER BY num_cores DESC
`, COLOCATED_LIMITS_TABLE)
rows, err := ExperimentDB.Query(query)
rows, err := experimentDB.Query(query)
if err != nil {
return nil, fmt.Errorf("cannot fetch data from experiment data table with query [%s]: %w", query, err)
}
Expand Down Expand Up @@ -674,15 +683,15 @@ Returns:
- A slice of ExpDataShardedLimit structs containing the fetched sharded table limits.
- An error if there was any issue during the data retrieval process.
*/
func loadShardedTableLimits() ([]ExpDataShardedLimit, error) {
func loadShardedTableLimits(experimentDB *sql.DB) ([]ExpDataShardedLimit, error) {
// added num_cores >= VCPUPerInstance from colo recommendation as that is the starting point
selectQuery := fmt.Sprintf(`
SELECT num_cores, memory_per_core, num_tables
FROM %s
WHERE dimension LIKE '%%TableLimits-3nodeRF=3%%'
ORDER BY num_cores
`, SHARDED_SIZING_TABLE)
rows, err := ExperimentDB.Query(selectQuery)
rows, err := experimentDB.Query(selectQuery)

if err != nil {
return nil, fmt.Errorf("error while fetching cores info with query [%s]: %w", selectQuery, err)
Expand Down Expand Up @@ -713,7 +722,7 @@ Returns:
- A slice of ExpDataShardedThroughput structs containing the fetched sharded throughput information.
- An error if there was any issue during the data retrieval process.
*/
func loadShardedThroughput() ([]ExpDataShardedThroughput, error) {
func loadShardedThroughput(experimentDB *sql.DB) ([]ExpDataShardedThroughput, error) {
selectQuery := fmt.Sprintf(`
SELECT inserts_per_core,
selects_per_core,
Expand All @@ -725,7 +734,7 @@ func loadShardedThroughput() ([]ExpDataShardedThroughput, error) {
WHERE dimension = 'MaxThroughput'
ORDER BY num_cores DESC;
`, SHARDED_SIZING_TABLE)
rows, err := ExperimentDB.Query(selectQuery)
rows, err := experimentDB.Query(selectQuery)
if err != nil {
return nil, fmt.Errorf("error while fetching throughput info with query [%s]: %w", selectQuery, err)
}
Expand Down Expand Up @@ -759,8 +768,7 @@ Returns:
[]SourceDBMetadata: all index objects from source db
float64: total size of source db
*/
func loadSourceMetadata() ([]SourceDBMetadata, []SourceDBMetadata, float64, error) {
filePath := GetSourceMetadataDBFilePath()
func loadSourceMetadata(filePath string) ([]SourceDBMetadata, []SourceDBMetadata, float64, error) {
SourceMetaDB, err := utils.ConnectToSqliteDatabase(filePath)
if err != nil {
return nil, nil, 0.0, fmt.Errorf("cannot connect to source metadata database: %w", err)
Expand Down Expand Up @@ -809,7 +817,7 @@ Returns:
int64: Total parallel jobs used for import.
*/
func calculateTimeTakenAndParallelJobsForImport(tableName string, dbObjects []SourceDBMetadata,
vCPUPerInstance int, memPerCore int) (float64, int64, error) {
vCPUPerInstance int, memPerCore int, experimentDB *sql.DB) (float64, int64, error) {
// the total size of objects
var size float64 = 0
var timeTakenOfFetchedRow float64
Expand Down Expand Up @@ -839,7 +847,7 @@ func calculateTimeTakenAndParallelJobsForImport(tableName string, dbObjects []So
AND num_cores = ?
LIMIT 1;
`, tableName, tableName, tableName)
row := ExperimentDB.QueryRow(selectQuery, vCPUPerInstance, memPerCore, size, vCPUPerInstance)
row := experimentDB.QueryRow(selectQuery, vCPUPerInstance, memPerCore, size, vCPUPerInstance)

if err := row.Scan(&maxSizeOfFetchedRow, &timeTakenOfFetchedRow, &parallelJobs); err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -984,7 +992,7 @@ func getReasoning(recommendation IntermediateRecommendation, shardedObjects []So
sizeUnitSharded, shardedReads, shardedWrites)
// If colocated objects exist, add sharded objects information as rest of the objects need to be migrated as sharded
if len(colocatedObjects) > 0 {
reasoning += " Rest " + shardedReasoning + " need to be migrated as range partitioned tables"
reasoning += " Rest " + shardedReasoning + "need to be migrated as range partitioned tables"
} else {
reasoning += shardedReasoning + "as sharded."
}
Expand Down Expand Up @@ -1052,17 +1060,16 @@ func getListOfIndexesAlongWithObjects(tableList []SourceDBMetadata,
return indexesAndObject, cumulativeIndexCount
}

func createConnectionToExperimentData() error {
func createConnectionToExperimentData() (*sql.DB, error) {
filePath, err := getExperimentFile()
if err != nil {
return fmt.Errorf("failed to get experiment file: %w", err)
return nil, fmt.Errorf("failed to get experiment file: %w", err)
}
DbConnection, err := utils.ConnectToSqliteDatabase(filePath)
if err != nil {
return fmt.Errorf("failed to connect to experiment data database: %w", err)
return nil, fmt.Errorf("failed to connect to experiment data database: %w", err)
}
ExperimentDB = DbConnection
return nil
return DbConnection, nil
}

func getExperimentFile() (string, error) {
Expand Down
Loading
Loading