Skip to content

Commit

Permalink
test(preprocessor): make test_absent_and_unspecified_to_ref independe…
Browse files Browse the repository at this point in the history
…nt of artificially generated test files

improve helpers.compare_vcf_files such that it compares vcfs line by line and report the first line of different for debugging purposes
  • Loading branch information
BinglanLi committed Nov 12, 2024
1 parent 5331c2b commit 5b581df
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 15 deletions.
2 changes: 1 addition & 1 deletion preprocessor/preprocessor/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ def extract_pgx_variants(pharmcat_positions: Path, reference_fasta: Path, vcf_fi

# update info
ref_info = list(set([x[7] for x in ref_pos_static[input_chr_pos].values()]))
if fields[7] != '.':
if fields[7] != '.' and fields[7] not in ref_info:
ref_info.append(fields[7])
updated_info = ';'.join(ref_info)

Expand Down
27 changes: 25 additions & 2 deletions preprocessor/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,29 @@ def compare_vcf_files(expected: Path, tmp_dir: Path, basename: str, sample: str
assert orig_actual_vcf in results, ('%s not in results (%s)' % (actual, results))
if copy_to_test_dir:
shutil.copyfile(actual, actual.parent / actual.name)
print(read_vcf(actual))

assert read_vcf(expected) == read_vcf(actual), '%s mismatch' % key
# compare vcfs line by line
expected_lines = read_vcf(expected).split('\n')
actual_lines = read_vcf(actual).split('\n')

if len(expected_lines) != len(actual_lines):
assert False, f'mismatching number of lines between {expected} and {actual}'

for expected_line, actual_line in zip(expected_lines, actual_lines):
columns = ['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT', 'Samples']
expected_fields = expected_line.split('\t')
actual_fields = actual_line.split('\t')

if len(expected_fields) != len(actual_fields):
assert False, f'mismatching number of samples between {expected} and {actual}'

for i, col in enumerate(columns):
# compare the ALT alleles
if i == 4 and set(actual_fields[3]) != set(expected_fields[3]):
assert False, f'mismatching {col}:\nexpected: {expected_line}\nactual: {actual_line}'
# compare genotypes
if i == 9 and actual_fields[9:] != expected_fields[9:]:
assert False, f'mismatching {col}:\nexpected: {expected_line}\nactual: {actual_line}'
# compare the rest
if actual_line[i] != expected_line[i]:
assert False, f'mismatching {col}:\nexpected: {expected_line}\nactual: {actual_line}'
53 changes: 41 additions & 12 deletions preprocessor/tests/test_utilities.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import shutil
import tempfile
import random
from pathlib import Path
from timeit import default_timer as timer
from typing import List
Expand Down Expand Up @@ -546,24 +547,52 @@ def test_export_single_sample_concurrent():
def test_absent_and_unspecified_to_ref():
reference_fasta: Path = helpers.get_reference_fasta(helpers.pharmcat_positions_file)

vcf_file = helpers.test_dir / 'raw_unspecified_genotypes.vcf.bgz'
expected_file = helpers.test_dir / 'test.unspecified_to_ref.absent_to_ref.vcf'
expected_file = helpers.pharmcat_positions_file
with tempfile.TemporaryDirectory() as td:
# copy the input test file to the temporary folder
# copy the expected file to the temporary folder
tmp_dir = Path(td)
tmp_vcf = tmp_dir / vcf_file.name
shutil.copyfile(vcf_file, tmp_vcf)
# copy the expected output files to the temporary folder
expected_vcf = tmp_dir / expected_file.name
shutil.copyfile(expected_file, expected_vcf)
expected_bgz = tmp_dir / 'pharmcat_positions.vcf.gz'
shutil.copyfile(expected_file, expected_bgz)
raw_lines = helpers.read_vcf(expected_bgz, bgzipped=True, skip_comments=False).split('\n')

# convert "0/0" to "0|0" to avoid irrelevant mismatching error
expected_vcf = tmp_dir / 'random_pharmcat_positions.vcf'
expected_lines = []
with open(expected_vcf, 'w') as f:
for line in raw_lines:
if not line.startswith('#'):
fields = line.split('\t')
fields[9] = '0|0'
line = '\t'.join(fields)
f.write(line + '\n')
expected_lines.append(line)

# create the test file by randomly knocking down positions or genotypes
test_file = tmp_dir / 'random_absent_and_unspecified_genotypes.vcf'
random.seed(37)
with open(test_file, 'w') as f:
for line in expected_lines:
if line.startswith('#'): # metadata and the header line
f.write(line + '\n')
else:
fields = line.split('\t')
rnd_int = random.randrange(10)
if rnd_int == 1: # make 10% of the positions absent
continue
if rnd_int == 2: # convert 10% of the positions to unspecified genotypes
fields[9] = './.'
line = '\t'.join(fields)
f.write(line + '\n')
test_vcf = utils.bgzip_file(test_file, True)

# copy the necessary helper files to the temporary folder
shutil.copyfile(preprocessor.CHR_RENAME_FILE, tmp_dir / preprocessor.CHR_RENAME_MAP_FILENAME)

# run the utility function to convert absent positions to reference
basename = 'test1'
samples = ['Sample_1', 'Sample_2']
input_basename = tmp_vcf.name
preprocessor.preprocess(helpers.pharmcat_positions_file, reference_fasta, [tmp_vcf], samples,
basename = 'test_absent_and_unspecified_to_ref'
samples = ['PharmCAT']
input_basename = test_vcf.name
preprocessor.preprocess(helpers.pharmcat_positions_file, reference_fasta, [test_vcf], samples,
input_basename, tmp_dir, basename, absent_to_ref=True, unspecified_to_ref=True)

helpers.compare_vcf_files(expected_vcf, tmp_dir, basename)
Expand Down

0 comments on commit 5b581df

Please sign in to comment.