Skip to content

Commit

Permalink
Changing index creation workflow from "post-snapshot-import" phase of…
Browse files Browse the repository at this point in the history
… import-schema to default phase. (#1537)

Changing index creation workflow from "post-snapshot-import" phase of import-schema to default phase. 

**Reason:** 
Based on perf experiments, it was shown that pre-creating indexes and then loading data was faster as compared to loading data and backfilling indexes. 
The theory here is:
- Index creation is serial(parallel indexes cannot be created on YB as of now). Furthermore, as a result, there will be as many full table scans as there are no. of indexes 
- On the other hand, writing to indexes at the time of INSERT/UPDATE is done in parallel.
  • Loading branch information
makalaaneesh authored and shaharuk-yb committed Jun 13, 2024
1 parent 5506673 commit 0701461
Show file tree
Hide file tree
Showing 19 changed files with 237 additions and 153 deletions.
6 changes: 6 additions & 0 deletions migtests/scripts/live-migration-fallb-run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ main() {
import_schema
run_ysql ${TARGET_DB_NAME} "\dt"

step "Run Schema validations."
if [ -x "${TEST_DIR}/validate-schema" ]
then
"${TEST_DIR}/validate-schema"
fi

step "Export data."
# false if exit code of export_data is non-zero
export_data --export-type "snapshot-and-changes" || {
Expand Down
6 changes: 6 additions & 0 deletions migtests/scripts/live-migration-fallf-run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ main() {
import_schema
run_ysql ${TARGET_DB_NAME} "\dt"

step "Run Schema validations."
if [ -x "${TEST_DIR}/validate-schema" ]
then
"${TEST_DIR}/validate-schema"
fi

step "Export data."
# false if exit code of export_data is non-zero
export_data --export-type "snapshot-and-changes" || {
Expand Down
6 changes: 6 additions & 0 deletions migtests/scripts/live-migration-run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ main() {
import_schema
run_ysql ${TARGET_DB_NAME} "\dt"

step "Run Schema validations."
if [ -x "${TEST_DIR}/validate-schema" ]
then
"${TEST_DIR}/validate-schema"
fi

step "Export data."
# false if exit code of export_data is non-zero
export_data --export-type "snapshot-and-changes" || {
Expand Down
6 changes: 6 additions & 0 deletions migtests/scripts/run-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ main() {
import_schema
run_ysql ${TARGET_DB_NAME} "\dt"

step "Run Schema validations."
if [ -x "${TEST_DIR}/validate-schema" ]
then
"${TEST_DIR}/validate-schema"
fi

step "Import data."
import_data

Expand Down
17 changes: 0 additions & 17 deletions migtests/tests/mysql/indexes/validate
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,13 @@ EXPECTED_ROW_COUNT = {
'single_index_test': 4
}

EXPECTED_INDEX_COUNT = {
'desc_index_test': 2,
'inunique_index_test': 3,
'mult_index_test': 2,
'outunique_index_test': 2,
'primary_index_test': 1,
'single_index_test': 2
}

def migration_completed_checks(tgt):
table_list = tgt.get_table_names("test_mysql_indexes")
print("table_list:", table_list)
assert len(table_list) == 6

got_row_count = tgt.row_count_of_all_tables("test_mysql_indexes")
for table_name, row_count in EXPECTED_ROW_COUNT.items():
print(f"table_name: {table_name}, row_count: {got_row_count[table_name]}")
assert row_count == got_row_count[table_name]

get_index_cnt = tgt.get_count_index_on_table("test_mysql_indexes")
for table_name, index_count in EXPECTED_INDEX_COUNT.items():
print(f"table_name: {table_name}, index_count: {get_index_cnt[table_name]}")
assert index_count == get_index_cnt[table_name]


if __name__ == "__main__":
main()
32 changes: 32 additions & 0 deletions migtests/tests/mysql/indexes/validate-schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python3

import yb

def main():
yb.run_checks(migration_completed_checks)


#=============================================================================

EXPECTED_INDEX_COUNT = {
'desc_index_test': 2,
'inunique_index_test': 3,
'mult_index_test': 2,
'outunique_index_test': 2,
'primary_index_test': 1,
'single_index_test': 2
}

def migration_completed_checks(tgt):
table_list = tgt.get_table_names("test_mysql_indexes")
print("table_list:", table_list)
assert len(table_list) == 6

get_index_cnt = tgt.get_count_index_on_table("test_mysql_indexes")
for table_name, index_count in EXPECTED_INDEX_COUNT.items():
print(f"table_name: {table_name}, index_count: {get_index_cnt[table_name]}")
assert index_count == get_index_cnt[table_name]


if __name__ == "__main__":
main()
12 changes: 0 additions & 12 deletions migtests/tests/oracle/indexes/validate
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,13 @@ EXPECTED_ROW_COUNT = {
'members': 1000
}

EXPECTED_INDEX_COUNT = {
'members': 4
}


def migration_completed_checks(tgt):
table_list = tgt.get_table_names("public")
print("table_list:", table_list)
assert len(table_list) == 1

got_row_count = tgt.row_count_of_all_tables("public")
for table_name, row_count in EXPECTED_ROW_COUNT.items():
print(f"table_name: {table_name}, row_count: {got_row_count[table_name]}")
assert row_count == got_row_count[table_name]

get_index_cnt = tgt.get_count_index_on_table("public")
for table_name, index_count in EXPECTED_INDEX_COUNT.items():
print(f"table_name: {table_name}, index_count: {get_index_cnt[table_name]}")
assert index_count == get_index_cnt[table_name]

INSERT_MEMBERS_QUERY = "insert into public.members (first_name, last_name, gender, email, dob) VALUES ('Pepi', 'Elce', 'F', '[email protected]', '1984-03-04');"
violate_unique_index_check_error = tgt.run_query_and_chk_error(INSERT_MEMBERS_QUERY, "23505")
Expand Down
29 changes: 29 additions & 0 deletions migtests/tests/oracle/indexes/validate-schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python3

import yb

def main():
yb.run_checks(migration_completed_checks)


# =============================================================================


EXPECTED_INDEX_COUNT = {
'members': 4
}


def migration_completed_checks(tgt):
table_list = tgt.get_table_names("public")
print("table_list:", table_list)
assert len(table_list) == 1

get_index_cnt = tgt.get_count_index_on_table("public")
for table_name, index_count in EXPECTED_INDEX_COUNT.items():
print(f"table_name: {table_name}, index_count: {get_index_cnt[table_name]}")
assert index_count == get_index_cnt[table_name]


if __name__ == "__main__":
main()
8 changes: 0 additions & 8 deletions migtests/tests/pg/indexes/validate
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,11 @@ def YB_specific_checks(tgt):
yb.verify_colocation(tgt)

def migration_completed_checks(tgt):
table_list = tgt.get_table_names("public")
print("table_list:", table_list)
assert len(table_list) == 9

got_row_count = tgt.row_count_of_all_tables("public")
for table_name, row_count in EXPECTED_ROW_COUNT.items():
print(f"table_name: {table_name}, row_count: {got_row_count[table_name]}")
assert row_count == got_row_count[table_name]

get_index_cnt = tgt.get_count_index_on_table("public")
for table_name, index_count in EXPECTED_INDEX_COUNT.items():
print(f"table_name: {table_name}, index_count: {get_index_cnt[table_name]}")
assert index_count == get_index_cnt[table_name]


if __name__ == "__main__":
Expand Down
36 changes: 36 additions & 0 deletions migtests/tests/pg/indexes/validate-schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env python3

import yb

def main():
yb.run_checks(migration_completed_checks)


#=============================================================================

EXPECTED_INDEX_COUNT = {
'single_index_test': 2,
'mult_index_test': 1,
'outunique_index_test': 2,
'desc_index_test': 2,
'partial_index_test': 2,
'exp_index_test': 2,
'hash_index_test': 2,
'covering_index_test': 1,
'gin_index_test': 1,
}

def migration_completed_checks(tgt):
table_list = tgt.get_table_names("public")
print("table_list:", table_list)
assert len(table_list) == 9


get_index_cnt = tgt.get_count_index_on_table("public")
for table_name, index_count in EXPECTED_INDEX_COUNT.items():
print(f"table_name: {table_name}, index_count: {get_index_cnt[table_name]}")
assert index_count == get_index_cnt[table_name]


if __name__ == "__main__":
main()
6 changes: 0 additions & 6 deletions migtests/tests/pg/misc-objects-2/validate
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ EXPECTED_TEXT_LENGTHS = {
34: 24,
}

EXPECTED_TRIGGER_LIST = ["audit_trigger",]

def YB_specific_checks(tgt):
yb.verify_colocation(tgt)
Expand All @@ -63,16 +62,11 @@ def migration_completed_checks(tgt):
print("table_list:", table_list)
assert len(table_list) == 5


got_row_count = tgt.row_count_of_all_tables("public")
for table_name, row_count in EXPECTED_ROW_COUNT.items():
print(f"table_name: {table_name}, row_count: {got_row_count[table_name]}")
assert row_count == got_row_count[table_name]

fetched_triggers = tgt.fetch_all_triggers("public")
print(f"fetched triggers list: {fetched_triggers}\n Expected triggers: {EXPECTED_TRIGGER_LIST}")
assert fetched_triggers == EXPECTED_TRIGGER_LIST

fetched_procedures_functions = tgt.fetch_all_procedures("public")
print(f"count of fecthed procedures/functions - {len(fetched_procedures_functions)}")
assert len(fetched_procedures_functions) == 40
Expand Down
27 changes: 27 additions & 0 deletions migtests/tests/pg/misc-objects-2/validate-schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python3

import yb

def main():
yb.run_checks(migration_completed_checks)


#=============================================================================

EXPECTED_TRIGGER_LIST = ["audit_trigger",]

def migration_completed_checks(tgt):
table_list = tgt.get_table_names("public")
print("table_list:", table_list)
assert len(table_list) == 5

fetched_triggers = tgt.fetch_all_triggers("public")
print(f"fetched triggers list: {fetched_triggers}\n Expected triggers: {EXPECTED_TRIGGER_LIST}")
assert fetched_triggers == EXPECTED_TRIGGER_LIST

fetched_procedures_functions = tgt.fetch_all_procedures("public")
print(f"count of fecthed procedures/functions - {len(fetched_procedures_functions)}")
assert len(fetched_procedures_functions) == 40

if __name__ == "__main__":
main()
28 changes: 27 additions & 1 deletion yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func beforeIndexCreation(sqlInfo sqlInfo, conn **pgx.Conn, objType string) error
return fmt.Errorf("extract qualified index name from DDL [%v]: %w", sqlInfo.stmt, err)
}
if invalidTargetIndexesCache == nil {
invalidTargetIndexesCache, err = tdb.InvalidIndexes()
invalidTargetIndexesCache, err = getInvalidIndexes(conn)
if err != nil {
return fmt.Errorf("failed to fetch invalid indexes: %w", err)
}
Expand All @@ -1100,6 +1100,32 @@ func beforeIndexCreation(sqlInfo sqlInfo, conn **pgx.Conn, objType string) error
return nil
}

func getInvalidIndexes(conn **pgx.Conn) (map[string]bool, error) {
var result = make(map[string]bool)
// NOTE: this shouldn't fetch any predefined indexes of pg_catalog schema (assuming they can't be invalid) or indexes of other successful migrations
query := "SELECT indexrelid::regclass FROM pg_index WHERE indisvalid = false"

rows, err := (*conn).Query(context.Background(), query)
if err != nil {
return nil, fmt.Errorf("querying invalid indexes: %w", err)
}
defer rows.Close()

for rows.Next() {
var fullyQualifiedIndexName string
err := rows.Scan(&fullyQualifiedIndexName)
if err != nil {
return nil, fmt.Errorf("scanning row for invalid index name: %w", err)
}
// if schema is not provided by catalog table, then it is public schema
if !strings.Contains(fullyQualifiedIndexName, ".") {
fullyQualifiedIndexName = fmt.Sprintf("public.%s", fullyQualifiedIndexName)
}
result[fullyQualifiedIndexName] = true
}
return result, nil
}

// TODO: This function is a duplicate of the one in tgtdb/yb.go. Consolidate the two.
func getTargetSchemaName(tableName string) string {
parts := strings.Split(tableName, ".")
Expand Down
Loading

0 comments on commit 0701461

Please sign in to comment.