-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added the test scripts for resumption #2117
Changes from 1 commit
045ad30
78209bf
0485f4c
1b2ee52
8fbb84f
0e5bcd0
986a51e
d5b7aaa
e8bd070
18a38ab
61c9b1d
d72c841
b38a7f5
a14fa6b
20c2715
9a6cf4c
b42f922
2d9749d
6751621
c19acf9
b5f7c9e
ec1f782
5e00018
fd8c456
288ab0f
3b9e19a
5d609e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ def load_config(config_file): | |
|
||
def prepare_import_data_file_command(config): | ||
""" | ||
Prepares the yb-voyager command based on the given configuration. | ||
Prepares the yb-voyager import data file command based on the given configuration. | ||
""" | ||
file_table_map = config['file_table_map'] | ||
additional_flags = config.get('additional_flags', {}) | ||
|
@@ -59,13 +59,44 @@ def prepare_import_data_file_command(config): | |
return args | ||
|
||
|
||
def prepare_import_data_command(config): | ||
""" | ||
Prepares the yb-voyager import data command based on the given configuration. | ||
""" | ||
|
||
additional_flags = config.get('additional_flags', {}) | ||
|
||
args = [ | ||
'yb-voyager', 'import', 'data', | ||
'--export-dir', os.getenv('EXPORT_DIR', ''), | ||
'--target-db-host', os.getenv('TARGET_DB_HOST', ''), | ||
'--target-db-port', os.getenv('TARGET_DB_PORT', ''), | ||
'--target-db-user', os.getenv('TARGET_DB_USER', ''), | ||
'--target-db-password', os.getenv('TARGET_DB_PASSWORD', ''), | ||
'--target-db-name', os.getenv('TARGET_DB_NAME', ''), | ||
'--disable-pb', 'true', | ||
'--send-diagnostics', 'false', | ||
] | ||
|
||
if os.getenv('SOURCE_DB_TYPE') != 'postgresql': | ||
args.extend(['--target-db-schema', os.getenv('TARGET_DB_SCHEMA', '')]) | ||
|
||
if os.getenv('RUN_WITHOUT_ADAPTIVE_PARALLELISM') == 'true': | ||
args.extend(['--enable-adaptive-parallelism', 'false']) | ||
|
||
for flag, value in additional_flags.items(): | ||
args.append(flag) | ||
args.append(value) | ||
|
||
return args | ||
|
||
|
||
def run_and_resume_voyager(command, resumption): | ||
""" | ||
Runs the yb-voyager command with support for resumption testing. | ||
Includes final import retry logic. | ||
""" | ||
for attempt in range(1, resumption['max_retries'] + 1): | ||
print(f"\n--- Attempt {attempt} of {resumption['max_retries']} ---") | ||
for attempt in range(1, resumption['max_restarts'] + 1): | ||
print(f"\n--- Attempt {attempt} of {resumption['max_restarts']} ---") | ||
try: | ||
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | ||
print("Running command:", ' '.join(command), flush=True) | ||
|
@@ -153,43 +184,68 @@ def run_and_resume_voyager(command, resumption): | |
print("Final import failed after 2 attempts.") | ||
sys.exit(1) | ||
|
||
|
||
def validate_row_counts(row_count, schema, export_dir): | ||
def validate_row_counts(row_count, export_dir): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note for future: you can create a common python file that has such helper |
||
""" | ||
Validates the row counts of the target tables after import. | ||
If the row count validation fails, it prints a message with the log path. | ||
If the row count validation fails, it logs details and exits. | ||
""" | ||
for table_name, expected_row_count in row_count.items(): | ||
print(f"\nValidating row count for table '{table_name}'...") | ||
failed_validations = [] | ||
|
||
for table_identifier, expected_row_count in row_count.items(): | ||
print(f"\nValidating row count for table '{table_identifier}'...") | ||
|
||
# Parse schema and table, always quote both | ||
if '.' in table_identifier: | ||
schema, table_name = table_identifier.split('.', 1) | ||
else: | ||
schema = "public" | ||
table_name = table_identifier | ||
|
||
tgt = None | ||
try: | ||
tgt = yb.new_target_db() | ||
tgt.connect() | ||
print("Connected to target database.") | ||
|
||
print(f"Connected to target database. Using schema: {schema}") | ||
actual_row_count = tgt.get_row_count(table_name, schema) | ||
|
||
if actual_row_count == expected_row_count: | ||
print(f"\u2714 Validation successful: {table_name} - Expected: {expected_row_count}, Actual: {actual_row_count}") | ||
print(f"\u2714 Validation successful: {table_identifier} - Expected: {expected_row_count}, Actual: {actual_row_count}") | ||
else: | ||
print(f"\u274C Validation failed: {table_name} - Expected: {expected_row_count}, Actual: {actual_row_count}") | ||
print(f"Row count validation failed. For more details check {export_dir}/logs") | ||
sys.exit(1) | ||
print(f"\u274C Validation failed: {table_identifier} - Expected: {expected_row_count}, Actual: {actual_row_count}") | ||
failed_validations.append((table_identifier, expected_row_count, actual_row_count)) | ||
except Exception as e: | ||
print(f"Error during validation: {e}") | ||
sys.exit(1) | ||
print(f"Error during validation for table '{table_identifier}': {e}") | ||
failed_validations.append((table_identifier, expected_row_count, "Error")) | ||
finally: | ||
if 'tgt' in locals() and tgt: | ||
if tgt: | ||
tgt.close() | ||
print("Disconnected from target database.") | ||
|
||
if failed_validations: | ||
print("\nValidation failed for the following tables:") | ||
for table, expected, actual in failed_validations: | ||
print(f" Table: {table}, Expected: {expected}, Actual: {actual}") | ||
print(f"\nFor more details, check {export_dir}/logs") | ||
sys.exit(1) | ||
else: | ||
print("\nAll table row counts validated successfully.") | ||
|
||
|
||
|
||
def run_import_with_resumption(config): | ||
""" | ||
Runs the yb-voyager import data file command with resumption testing and validation. | ||
""" | ||
|
||
import_type = config.get('import_type', 'file') # Default to 'file' if not specified | ||
|
||
if import_type == 'file': | ||
command = prepare_import_data_file_command(config) | ||
elif import_type == 'offline': | ||
command = prepare_import_data_command(config) | ||
else: | ||
raise ValueError(f"Unsupported import_type: {import_type}") | ||
|
||
command = prepare_import_data_file_command(config) | ||
run_and_resume_voyager(command, config['resumption']) | ||
validate_row_counts(config['row_count'], os.getenv('TARGET_DB_SCHEMA', 'public'), os.getenv('EXPORT_DIR', '')) | ||
|
||
validate_row_counts(config['row_count'], os.getenv('EXPORT_DIR', '')) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
#!/bin/bash | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assuming that the ONLY change here is that you're specifying ROW_COUNT and essentially making generate_series dynamic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes correct |
||
|
||
set -e | ||
set -x | ||
|
||
source ${SCRIPTS}/functions.sh | ||
|
||
# Set default row count (can be overridden by user input) | ||
ROW_COUNT=${1:-1000} # Default to 1000 if no argument is provided | ||
|
||
REGIONS=('London' 'Boston' 'Sydney') | ||
AMOUNTS=(1000 2000 5000) | ||
|
||
# Insert into sales_region table | ||
sql_sales_region=" | ||
WITH region_list AS ( | ||
SELECT ARRAY['${REGIONS[0]}', '${REGIONS[1]}', '${REGIONS[2]}']::TEXT[] region | ||
), amount_list AS ( | ||
SELECT ARRAY[${AMOUNTS[0]}, ${AMOUNTS[1]}, ${AMOUNTS[2]}]::INT[] amount | ||
) | ||
INSERT INTO sales_region | ||
(id, amount, branch, region) | ||
SELECT | ||
n, | ||
amount[1 + mod(n, array_length(amount, 1))], | ||
'Branch ' || n as branch, | ||
region[1 + mod(n, array_length(region, 1))] | ||
FROM amount_list, region_list, generate_series(1, $ROW_COUNT) as n; | ||
" | ||
run_psql "${SOURCE_DB_NAME}" "$sql_sales_region" | ||
|
||
# Insert into test_partitions_sequences table | ||
sql_test_partitions_sequences=" | ||
WITH region_list AS ( | ||
SELECT ARRAY['${REGIONS[0]}', '${REGIONS[1]}', '${REGIONS[2]}']::TEXT[] region | ||
), amount_list AS ( | ||
SELECT ARRAY[${AMOUNTS[0]}, ${AMOUNTS[1]}, ${AMOUNTS[2]}]::INT[] amount | ||
) | ||
INSERT INTO test_partitions_sequences | ||
(amount, branch, region) | ||
SELECT | ||
amount[1 + mod(n, array_length(amount, 1))], | ||
'Branch ' || n as branch, | ||
region[1 + mod(n, array_length(region, 1))] | ||
FROM amount_list, region_list, generate_series(1, $ROW_COUNT) as n; | ||
" | ||
run_psql "${SOURCE_DB_NAME}" "$sql_test_partitions_sequences" | ||
|
||
# Insert into p1.sales_region table | ||
sql_p1_sales_region=" | ||
WITH region_list AS ( | ||
SELECT ARRAY['${REGIONS[0]}', '${REGIONS[1]}', '${REGIONS[2]}']::TEXT[] region | ||
), amount_list AS ( | ||
SELECT ARRAY[${AMOUNTS[0]}, ${AMOUNTS[1]}, ${AMOUNTS[2]}]::INT[] amount | ||
) | ||
INSERT INTO p1.sales_region | ||
(id, amount, branch, region) | ||
SELECT | ||
n, | ||
amount[1 + mod(n, array_length(amount, 1))], | ||
'Branch ' || n as branch, | ||
region[1 + mod(n, array_length(region, 1))] | ||
FROM amount_list, region_list, generate_series(1, $ROW_COUNT) as n; | ||
" | ||
run_psql "${SOURCE_DB_NAME}" "$sql_p1_sales_region" | ||
|
||
# Insert into sales table | ||
sql_sales=" | ||
WITH amount_list AS ( | ||
SELECT ARRAY[${AMOUNTS[0]}, ${AMOUNTS[1]}, ${AMOUNTS[2]}]::INT[] amount | ||
), date_list AS ( | ||
SELECT ARRAY['2019-11-01'::TIMESTAMP, '2020-02-01'::TIMESTAMP, '2020-05-01'::TIMESTAMP] sale_date | ||
) | ||
INSERT INTO sales | ||
(id, p_name, amount, sale_date) | ||
SELECT | ||
n, | ||
'Person ' || n as p_name, | ||
amount[1 + mod(n, array_length(amount, 1))], | ||
sale_date[1 + mod(n, array_length(amount, 1))] | ||
FROM | ||
amount_list, | ||
date_list, | ||
generate_series(1, $ROW_COUNT) as n; | ||
" | ||
run_psql "${SOURCE_DB_NAME}" "$sql_sales" | ||
|
||
# Insert into range_columns_partition_test table | ||
sql_range_columns_partition_test=" | ||
INSERT INTO range_columns_partition_test | ||
VALUES | ||
(5, 5), | ||
(3, 4), | ||
(5, 11), | ||
(5, 12), | ||
(4, 3), | ||
(3, 1); | ||
" | ||
run_psql "${SOURCE_DB_NAME}" "$sql_range_columns_partition_test" | ||
|
||
sql_select_range_columns_partition_test=" | ||
SELECT | ||
tableoid :: regclass, | ||
* | ||
FROM | ||
range_columns_partition_test; | ||
" | ||
run_psql "${SOURCE_DB_NAME}" "$sql_select_range_columns_partition_test" | ||
|
||
# Insert into emp table | ||
sql_emp=" | ||
INSERT INTO emp | ||
SELECT num, 'user_' || num , (RANDOM()*50)::INTEGER | ||
FROM generate_series(1, $ROW_COUNT) AS num; | ||
" | ||
run_psql "${SOURCE_DB_NAME}" "$sql_emp" | ||
|
||
# Insert into customers table | ||
sql_customers=" | ||
WITH status_list AS ( | ||
SELECT '{"ACTIVE", "RECURRING", "REACTIVATED", "EXPIRED"}'::TEXT[] statuses | ||
), arr_list AS ( | ||
SELECT '{100, 200, 50, 250}'::INT[] arr | ||
) | ||
INSERT INTO customers | ||
(id, statuses, arr) | ||
SELECT n, | ||
statuses[1 + mod(n, array_length(statuses, 1))], | ||
arr[1 + mod(n, array_length(arr, 1))] | ||
FROM arr_list, generate_series(1,$ROW_COUNT) AS n, status_list; | ||
" | ||
run_psql "${SOURCE_DB_NAME}" "$sql_customers" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's get/define all the configs in the beginning. It will make it easier to understand what all configuration options are involved.