diff --git a/clinica/iotools/bids_utils.py b/clinica/iotools/bids_utils.py index b166db1cb..51e4b001c 100644 --- a/clinica/iotools/bids_utils.py +++ b/clinica/iotools/bids_utils.py @@ -881,7 +881,7 @@ def run_dcm2niix( from clinica.utils.stream import cprint - cprint(f"Attempting to convert {output_fmt}.", lvl="info") + cprint(f"Attempting to convert {output_fmt}.", lvl="debug") command = _build_dcm2niix_command( input_dir, output_dir, output_fmt, compress, bids_sidecar ) diff --git a/clinica/iotools/converters/adni_to_bids/adni_modalities/adni_fmap.py b/clinica/iotools/converters/adni_to_bids/adni_modalities/adni_fmap.py index 7776a6f4f..b51e2d2b8 100644 --- a/clinica/iotools/converters/adni_to_bids/adni_modalities/adni_fmap.py +++ b/clinica/iotools/converters/adni_to_bids/adni_modalities/adni_fmap.py @@ -1,11 +1,17 @@ """Module for converting field maps of ADNI.""" +import json +import os +import re +import shutil +from enum import Enum from os import PathLike from pathlib import Path -from typing import List, Optional +from typing import Callable, Iterable, List, Optional import pandas as pd from clinica.iotools.converters.adni_to_bids.adni_utils import load_clinical_csv +from clinica.utils.stream import cprint def convert_adni_fmap( @@ -57,7 +63,8 @@ def convert_adni_fmap( subjects = list(adni_merge.PTID.unique()) cprint( - f"Calculating paths of fMRI field maps (FMAPs). Output will be stored in {conversion_dir}." + f"Calculating paths of fMRI field maps (FMAPs). Output will be stored in {conversion_dir}.", + lvl="debug", ) images = compute_fmap_path(source_dir, csv_dir, subjects, conversion_dir) @@ -65,7 +72,7 @@ def convert_adni_fmap( cprint("Paths of field maps found. Exporting images into BIDS ...") paths_to_bids(images, destination_dir, "fmap", mod_to_update=mod_to_update) - rename_fmaps(destination_dir) + reorganize_fmaps(Path(destination_dir)) cprint(msg="Field maps conversion done.", lvl="debug") @@ -162,23 +169,24 @@ def compute_fmap_path( if fmap_dfs_list: fmap_df = pd.concat(fmap_dfs_list, ignore_index=True) - # Exceptions - # ========== - conversion_errors = [ - ("006_S_4485", "m84"), - ("123_S_4127", "m96"), - # Eq_1 - ("094_S_4503", "m24"), - ("009_S_4388", "m72"), - ("036_S_6088", "bl"), - ("036_S_6134", "bl"), - ("016_S_6802", "bl"), - ("016_S_6816", "bl"), - ("126_S_4891", "m84"), - ("177_S_6448", "m24"), - # Multiple images - ("029_S_2395", "m72"), - ] + # Exceptions + # ========== + conversion_errors = [ + ("006_S_4485", "m84"), + ("123_S_4127", "m96"), + # Eq_1 + ("094_S_4503", "m24"), + ("009_S_4388", "m72"), + ("036_S_6088", "bl"), + ("036_S_6134", "bl"), + ("016_S_6802", "bl"), + ("016_S_6816", "bl"), + ("126_S_4891", "m84"), + ("177_S_6448", "m24"), + ("023_S_4115", "m126"), + # Multiple images + ("029_S_2395", "m72"), + ] # Removing known exceptions from images to convert if not fmap_df.empty: @@ -245,165 +253,216 @@ def fmap_image( return image_dict -def rename_fmaps(destination_dir: Path): - import json - import os - - dir_name = destination_dir - - for root, dirs, _ in os.walk(destination_dir): - for dir_name in dirs: - if dir_name.startswith("sub-ADNI"): - for subroot, subdir, filenames in os.walk(os.path.join(root, dir_name)): - for filename in filenames: - fmap_filename = os.path.join(subroot, filename) - if filename[-11:] == "fmap.nii.gz" and os.path.exists( - fmap_filename - ): - mag_filename = os.path.join( - subroot, filename[:-11] + "magnitude1.nii.gz" - ) - os.rename(fmap_filename, mag_filename) - if filename[-14:] == "fmap_e2.nii.gz" and os.path.exists( - fmap_filename - ): - mag_filename = os.path.join( - subroot, filename[:-14] + "magnitude2.nii.gz" - ) - os.rename(fmap_filename, mag_filename) - if filename[-9:] == "fmap.json" and os.path.exists( - fmap_filename - ): - os.remove(fmap_filename) - if filename[-12:] == "fmap_e2.json" and os.path.exists( - fmap_filename - ): - os.remove(fmap_filename) - if filename[-12:] == "fmap_ph.json" and os.path.exists( - fmap_filename - ): - check_json(fmap_filename, filename, subroot) - if filename[-15:] == "fmap_e2_ph.json" and os.path.exists( - fmap_filename - ): - check_json(fmap_filename, filename, subroot) - if filename[-14:] == "fmap_ph.nii.gz": - if os.path.exists(fmap_filename[:-14] + "phase1.json"): - os.rename( - fmap_filename, fmap_filename[:-14] + "phase1.nii.gz" - ) - elif os.path.exists(fmap_filename[:-14] + "fmap_ph.json"): - json_filepath = fmap_filename[:-14] + "fmap_ph.json" - with open(json_filepath, "r") as file: - fmap_data = json.load(file) - if ( - "EchoTime1" in fmap_data - and "EchoTime2" in fmap_data - ): - os.rename( - fmap_filename, - fmap_filename[:-14] + "phasediff.nii.gz", - ) - os.rename( - json_filepath, - json_filepath[:-12] + "phasediff.json", - ) - else: - os.rename( - fmap_filename, - fmap_filename[:-14] + "phase1.nii.gz", - ) - os.rename( - json_filepath, - json_filepath[:-12] + "phase1.json", - ) - elif os.path.exists(fmap_filename[:-17] + "phasediff.json"): - json_filepath = fmap_filename[:-17] + "phasediff.json" - with open(json_filepath, "r") as file: - fmap_data = json.load(file) - if ( - "EchoTime1" in fmap_data - and "EchoTime2" in fmap_data - ): - os.rename( - fmap_filename, - fmap_filename[:-17] + "phasediff.nii.gz", - ) - if filename[-17:] == "fmap_e2_ph.nii.gz": - if os.path.exists(fmap_filename[:-17] + "phase2.json"): - os.rename( - fmap_filename, fmap_filename[:-17] + "phase2.nii.gz" - ) - elif os.path.exists( - fmap_filename[:-17] + "fmap_e2_ph.json" - ): - json_filepath = fmap_filename[:-17] + "fmap_e2_ph.json" - with open(json_filepath, "r") as file: - fmap_data = json.load(file) - if ( - "EchoTime1" in fmap_data - and "EchoTime2" in fmap_data - ): - os.rename( - fmap_filename, - fmap_filename[:-17] + "phasediff.nii.gz", - ) - os.rename( - json_filepath, - json_filepath[:-15] + "phasediff.json", - ) - else: - os.rename( - fmap_filename, - fmap_filename[:-17] + "phase2.nii.gz", - ) - os.rename( - json_filepath, - json_filepath[:-15] + "phase2.json", - ) - elif os.path.exists(fmap_filename[:-17] + "phasediff.json"): - json_filepath = fmap_filename[:-17] + "phasediff.json" - with open(json_filepath, "r") as file: - fmap_data = json.load(file) - if ( - "EchoTime1" in fmap_data - and "EchoTime2" in fmap_data - ): - os.rename( - fmap_filename, - fmap_filename[:-17] + "phasediff.nii.gz", - ) - - -def check_json(fmap_filename, filename, subroot): - import json - import os - - nifti_file = fmap_filename[:-5] + "nii.gz" - with open(fmap_filename, "r") as file: - fmap_data = json.load(file) - if "EchoTime1" in fmap_data and "EchoTime2" in fmap_data: - phase_json = os.path.join(subroot, filename[:-15] + "phasediff.json") - os.rename(fmap_filename, phase_json) - if os.path.exists(nifti_file): - phase_nifti = os.path.join( - subroot, filename[:-15] + "_phasediff.nii.gz" - ) - os.rename(nifti_file, phase_nifti) +class BIDSFMAPCase(str, Enum): + EMPTY_FOLDER = "empty_folder" + ONE_PHASE_TWO_MAGNITUDES = "one_phase_two_magnitudes" + TWO_PHASES_TWO_MAGNITUDES = "two_phases_two_magnitudes" + DIRECT_FIELDMAPS = "direct_fieldmaps" + NOT_SUPPORTED = "not_supported" + + +def phase_magnitude_renamer(old_extension: str, case: BIDSFMAPCase) -> str: + # Assuming case 1 with 1 or 2 magnitudes have the same outputs from dcm2nix + if ( + case != BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES + and case != BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES + ): + raise NotImplementedError( + f"No renaming should be performed for case {case.value}" + ) + + magnitude_match = re.match(pattern="^e([1-2])$", string=old_extension) + if magnitude_match: + return f"magnitude{magnitude_match.group(1)}" + + phase_match = re.match(pattern="^e([1-2])_ph$", string=old_extension) + if phase_match: + if case == BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES: + return "phasediff" + elif case == BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES: + return f"phase{phase_match.group(1)}" + + raise ValueError(f"Extension {old_extension} not taken in charge.") + + +def rename_files(fmap_path: Path, case: BIDSFMAPCase): + filenames = [f.name for f in fmap_path.iterdir() if not f.name.startswith(".")] + pattern = r"(.*_)fmap_(.*?)(\..*)$" + for previous_filename in filenames: + rgx = re.search(pattern, previous_filename) + if rgx: + cut, extension, type = rgx.group(1), rgx.group(2), rgx.group(3) + new_name = f"{cut}{phase_magnitude_renamer(extension, case)}{type}" + os.rename(fmap_path / previous_filename, fmap_path / new_name) else: - if filename[:-12] == "fmap_ph.json": - phase_json = os.path.join(subroot, filename[:-12] + "phase1.json") - os.rename(fmap_filename, phase_json) - if os.path.exists(nifti_file): - phase_nifti = os.path.join( - subroot, filename[:-12] + "phase1.nii.gz" - ) - os.rename(nifti_file, phase_nifti) - if filename[:-15] == "fmap_e2_ph.json": - phase_json = os.path.join(subroot, filename[:-15] + "phase2.json") - os.rename(fmap_filename, phase_json) - if os.path.exists(nifti_file): - phase_nifti = os.path.join( - subroot, filename[:-15] + "phase2.nii.gz" - ) - os.rename(nifti_file, phase_nifti) + raise ValueError(f"Invalid file {previous_filename} was found.") + + +def fmap_case_handler_factory(case: BIDSFMAPCase) -> Callable: + if case == BIDSFMAPCase.EMPTY_FOLDER: + return empty_folder_handler + if case == BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES: + return one_phase_two_magnitudes_handler + if case == BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES: + return two_phases_two_magnitudes_handler + if case == BIDSFMAPCase.DIRECT_FIELDMAPS: + return direct_fieldmaps_handler + if case == BIDSFMAPCase.NOT_SUPPORTED: + return not_supported_handler + + +def get_json_file_matching_pattern(fmap_path: Path, pattern: str) -> Path: + json_file = [ + f + for f in fmap_path.iterdir() + if not f.name.startswith(".") and f.name.endswith(pattern) + ] + if len(json_file) != 1: + msg = f"Expected only a single JSON file ending in '{pattern}' in {fmap_path}, but got:" + msg += "\n".join([f.name for f in json_file]) + raise ValueError(msg) + return json_file[0] + + +def check_json_contains_keys(json_file: Path, keys: Iterable[str]) -> bool: + with open(json_file, "r") as file: + json_data = json.load(file) + missing_keys = [key for key in keys if key not in json_data] + if missing_keys: + cprint( + f"Invalid file {json_file}, missing the following keys: {missing_keys}.", + lvl="warning", + ) + return False + return True + + +def empty_folder_handler(fmap_path: Path): + cprint( + f"Folder for subject {fmap_path.parent.parent.name}," + f"session {fmap_path.parent.name} is empty", + lvl="warning", + ) + + +def one_phase_two_magnitudes_handler(fmap_path: Path): + """Performs the checks and renaming associated to BIDS specifications, case 1 for fieldmaps""" + cprint( + f"BIDS Case 1 : expecting 1 phasediff and at least 1 magnitude files for subject" + f"{fmap_path.parent.parent.name}, session {fmap_path.parent.name}.", + lvl="info", + ) + json_file = get_json_file_matching_pattern(fmap_path, pattern="ph.json") + if check_json_contains_keys(json_file, ("EchoTime1", "EchoTime2")): + rename_files(fmap_path, BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES) + else: + not_supported_handler(fmap_path) + + +def two_phases_two_magnitudes_handler(fmap_path: Path): + """Performs the checks and renaming for BIDS spec case 2 for fieldmaps""" + cprint( + f"BIDS Case 2 : expecting 2 phase and 2 magnitude files for subject {fmap_path.parent.parent.name}," + f"session {fmap_path.parent.name}.", + lvl="info", + ) + json_files = [ + f + for f in fmap_path.iterdir() + if not f.name.startswith(".") and f.name.endswith("ph.json") + ] + if all([check_json_contains_keys(f, ("EchoTime",)) for f in json_files]): + rename_files(fmap_path, BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES) + else: + not_supported_handler(fmap_path) + + +def direct_fieldmaps_handler(fmap_path: Path): + cprint( + f"Assuming BIDS Case 3 for subject {fmap_path.parent.parent.name}," + f"session {fmap_path.parent.name}. Not supported yet.", + lvl="info", + ) + # Assuming filename ends with "fmap" for the fieldmap + json_file = get_json_file_matching_pattern(fmap_path, pattern="fmap.json") + check_json_contains_keys(json_file, ("Units",)) + not_supported_handler(fmap_path) + + +def not_supported_handler(fmap_path: Path): + to_delete = os.listdir(fmap_path) + cprint( + f"No currently supported BIDS case was recognized for subject {fmap_path.parent.parent.name}," + f"session {fmap_path.parent.name}." + f"The following files will be deleted : {to_delete}.", + lvl="warning", + ) + for file in to_delete: + os.remove(fmap_path / file) + + +def infer_case_fmap(fmap_path: Path) -> BIDSFMAPCase: + files = [f.name for f in fmap_path.iterdir() if not f.name.startswith(".")] + nb_files = len(files) + + extensions = set( + re.search(r"fmap(.*).json", f).group(1) + for f in files + if re.search(r"fmap(.*).json", f) + ) + + if nb_files == 0: + return BIDSFMAPCase.EMPTY_FOLDER + elif nb_files == 4: + if extensions == {""}: + return BIDSFMAPCase.DIRECT_FIELDMAPS + else: + return BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES + elif nb_files == 6 and extensions == {"_e1", "_e2", "_e2_ph"}: + return BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES + elif nb_files == 8 and extensions == {"_e1", "_e2", "_e1_ph", "_e2_ph"}: + return BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES + else: + return BIDSFMAPCase.NOT_SUPPORTED + + +def reorganize_fmaps(bids_path: Path): + for subject in ( + folder for folder in bids_path.iterdir() if folder.name.startswith("sub-ADNI") + ): + for session in ( + folder + for folder in subject.iterdir() + if folder.name.startswith("ses-") and folder.is_dir() + ): + fmap_path = bids_path / subject / session / "fmap" + if fmap_path.exists(): + fmap_case_handler_factory(infer_case_fmap(fmap_path))(fmap_path) + + +def bids_guess(bids_path: Path): + # todo : WIP - add where necessary to check + + bids_path = Path(bids_path) + + for file_path in bids_path.rglob(pattern=r"*.json"): + str_file_path = str(file_path) + if "fmap" in file_path.name: + with open(file_path, "r") as f: + file_json = json.load(f) + if "BidsGuess" in file_json: + bids_guess = file_json["BidsGuess"][-1].split("_")[-1] + + cut = re.search(r"\S*_fmap", str_file_path).group(0) + os.rename(src=str_file_path, dst=cut + "_" + bids_guess + ".json") + os.rename( + src=str_file_path.removesuffix(".json") + ".nii.gz", + dst=cut + "_" + bids_guess + ".nii.gz", + ) + else: + cprint( + msg=f"The FMAP file {file_path.name} could not be renamed " + f"since dcm2nix did not find any BIDS correspondence", + lvl="warning", + ) + # todo : delete files diff --git a/clinica/iotools/converters/adni_to_bids/adni_utils.py b/clinica/iotools/converters/adni_to_bids/adni_utils.py index 6151ddb2e..48f0867d2 100644 --- a/clinica/iotools/converters/adni_to_bids/adni_utils.py +++ b/clinica/iotools/converters/adni_to_bids/adni_utils.py @@ -1491,28 +1491,13 @@ def create_file( output_image = file_without_extension.with_suffix(".nii.gz") if image.Is_Dicom: - if modality == "fmap": - success = all( - [ - run_dcm2niix( - input_dir=fmap_image_path, - output_dir=output_path, - output_fmt=output_filename, - compress=True if zip_image == "y" else False, - bids_sidecar=True if generate_json == "y" else False, - ) - for fmap_image_path in Path(image_path).parent.iterdir() - if not fmap_image_path.name.startswith(".") - ] - ) - else: - success = run_dcm2niix( - input_dir=image_path, - output_dir=output_path, - output_fmt=output_filename, - compress=True if zip_image == "y" else False, - bids_sidecar=True if generate_json == "y" else False, - ) + success = run_dcm2niix( + input_dir=image_path if modality != "fmap" else Path(image_path).parent, + output_dir=output_path, + output_fmt=output_filename, + compress=True if zip_image == "y" else False, + bids_sidecar=True if generate_json == "y" else False, + ) if not success: cprint( f"Error converting image {image_path} for subject {subject} and session {session}", diff --git a/test/unittests/iotools/converters/adni_to_bids/adni_modalities/test_adni_fmap.py b/test/unittests/iotools/converters/adni_to_bids/adni_modalities/test_adni_fmap.py new file mode 100644 index 000000000..83ef4a8b8 --- /dev/null +++ b/test/unittests/iotools/converters/adni_to_bids/adni_modalities/test_adni_fmap.py @@ -0,0 +1,302 @@ +import json +from pathlib import Path +from typing import Set + +import pytest + +from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + BIDSFMAPCase, +) + + +@pytest.mark.parametrize( + "case, input_value, expected", + [ + (BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES, "e2", "magnitude2"), + (BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES, "e2_ph", "phasediff"), + (BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES, "e1", "magnitude1"), + (BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES, "e2_ph", "phase2"), + ], +) +def test_phase_magnitude_renamer_success(case, input_value, expected): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + phase_magnitude_renamer, + ) + + assert phase_magnitude_renamer(input_value, case) == expected + + +@pytest.mark.parametrize( + "case, input_value", + [ + (BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES, "_e2"), + (BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES, "e7"), + (BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES, "ee1"), + (BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES, "e2ph"), + (BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES, " e2"), + (BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES, "ea"), + (BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES, "21"), + ], +) +def test_phase_magnitude_renamer_value_error(case, input_value): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + phase_magnitude_renamer, + ) + + with pytest.raises( + ValueError, match=f"Extension {input_value} not taken in charge." + ): + phase_magnitude_renamer(input_value, case) + + +@pytest.mark.parametrize( + "case, input_value", + [ + (BIDSFMAPCase.DIRECT_FIELDMAPS, "e2"), + (BIDSFMAPCase.NOT_SUPPORTED, "e1"), + (BIDSFMAPCase.EMPTY_FOLDER, "foo"), + ], +) +def test_phase_magnitude_renamer_implemented_error(case, input_value): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + phase_magnitude_renamer, + ) + + with pytest.raises( + NotImplementedError, + match=f"No renaming should be performed for case {case.value}", + ): + phase_magnitude_renamer(input_value, case) + + +@pytest.fixture +def fmap_case_builder(tmp_path: Path, case: BIDSFMAPCase): + fmap_path = tmp_path / "fmap" + fmap_path.mkdir() + filenames = [ + "sub-01_sesM0_fmap_e1.nii.gz", + "sub-01_sesM0_fmap_e1.json", + "sub-01_sesM0_fmap_e2.nii.gz", + "sub-01_sesM0_fmap_e2.json", + "sub-01_sesM0_fmap_e2_ph.nii.gz", + "sub-01_sesM0_fmap_e2_ph.json", + ".foo.txt", + ] + if case == BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES: + filenames += [ + "sub-01_sesM0_fmap_e1_ph.nii.gz", + "sub-01_sesM0_fmap_e1_ph.json", + ] + for f in filenames: + (fmap_path / f).touch() + + +@pytest.fixture +def expected(tmp_path: Path, case: BIDSFMAPCase) -> Set[Path]: + fmap_path = tmp_path / "fmap" + + expected_result = { + fmap_path / "sub-01_sesM0_magnitude1.nii.gz", + fmap_path / "sub-01_sesM0_magnitude1.json", + fmap_path / "sub-01_sesM0_magnitude2.nii.gz", + fmap_path / "sub-01_sesM0_magnitude2.json", + fmap_path / ".foo.txt", + } + if case == BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES: + expected_result = expected_result.union( + { + fmap_path / "sub-01_sesM0_phasediff.nii.gz", + fmap_path / "sub-01_sesM0_phasediff.json", + } + ) + if case == BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES: + expected_result = expected_result.union( + { + fmap_path / "sub-01_sesM0_phase2.nii.gz", + fmap_path / "sub-01_sesM0_phase2.json", + fmap_path / "sub-01_sesM0_phase1.nii.gz", + fmap_path / "sub-01_sesM0_phase1.json", + } + ) + return expected_result + + +@pytest.mark.parametrize( + "case", + [BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES, BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES], +) +def test_rename_files_success(tmp_path, case, fmap_case_builder, expected): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + rename_files, + ) + + rename_files(tmp_path / "fmap", case) + + assert set((tmp_path / "fmap").iterdir()) == expected + + +@pytest.mark.parametrize( + "invalid_filename", + [ + "sub-01_sesM0_fmape1.nii.gz", + "sub-01_sesM0_magnitude.nii.gz", + "sub-01_sesM0fmap_e1.nii.gz", + "foo.txt", + ], +) +def test_rename_files_error(tmp_path, invalid_filename): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + rename_files, + ) + + fmap = tmp_path / "fmap" + fmap.mkdir() + (fmap / invalid_filename).touch() + + with pytest.raises(ValueError, match=f"Invalid file {invalid_filename} was found."): + rename_files(fmap, BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES) + + +def test_get_json_file_matching_pattern_success(tmp_path): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + get_json_file_matching_pattern, + ) + + fmap = tmp_path / "fmap" + fmap.mkdir() + for f in [ + "sub-01_sesM0_fmap_e2_ph.json", + "sub-01_sesM0_fmap_e2_ph.nii.gz", + "sub-01_sesM0_fmap_e1.json", + ".foo.txt", + ]: + (fmap / f).touch() + + assert ( + get_json_file_matching_pattern(fmap, pattern="ph.json") + == fmap / "sub-01_sesM0_fmap_e2_ph.json" + ) + + +@pytest.mark.parametrize( + "filenames, pattern", + [ + (("sub-01_sesM0_fmap_e2.json",), "ph.json"), + (("sub-01_sesM0_fmap_e2_ph.json", "sub-01_sesM0_fmap_e1_ph.json"), "ph.json"), + (("sub-01_sesM0_fmap_e2_ph.json"), "fmap.json"), + ], +) +def test_get_json_file_matching_pattern_error(tmp_path, filenames, pattern): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + get_json_file_matching_pattern, + ) + + fmap = tmp_path / "fmap" + fmap.mkdir() + for f in filenames: + (fmap / f).touch() + + with pytest.raises(ValueError): + get_json_file_matching_pattern(fmap, pattern) + + +@pytest.mark.parametrize( + "keys", + [ + ("Echo1",), + ("Echo1", "Echo2"), + ], +) +def test_check_json_contains_keys(tmp_path, keys): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + check_json_contains_keys, + ) + + fmap = tmp_path / "fmap" + fmap.mkdir() + json_file_success = fmap / "success.json" + with open(json_file_success, "w") as f: + json.dump({key: [] for key in keys}, f) + + json_file_error = fmap / "error.json" + with open(json_file_error, "w") as f: + json.dump({"Error": []}, f) + + assert check_json_contains_keys(json_file_success, keys) + assert not check_json_contains_keys(json_file_error, keys) + + +@pytest.mark.parametrize( + "filenames, expected", + [ + ((".foo.txt",), BIDSFMAPCase.EMPTY_FOLDER), + ( + ( + "sub-01_sesM0_fmap_e2_ph.json", + "sub-01_sesM0_fmap_e2_ph.nii.gz", + "sub-01_sesM0_fmap_e1.json", + "sub-01_sesM0_fmap_e1.nii.gz", + ), + BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES, + ), + ( + ( + "sub-01_sesM0_fmap.json", + "sub-01_sesM0_fmap.nii.gz", + "sub-01_sesM0_magnitude.json", + "sub-01_sesM0_magnitude.nii.gz", + ), + BIDSFMAPCase.DIRECT_FIELDMAPS, + ), + ( + ( + "sub-01_sesM0_fmap_e2_ph.json", + "sub-01_sesM0_fmap_e2_ph.nii.gz", + "sub-01_sesM0_fmap_e1.json", + "sub-01_sesM0_fmap_e1.nii.gz", + "sub-01_sesM0_fmap_e2.json", + "sub-01_sesM0_fmap_e2.nii.gz", + ), + BIDSFMAPCase.ONE_PHASE_TWO_MAGNITUDES, + ), + ( + ( + "sub-01_sesM0_fmap_e2_ph.json", + "sub-01_sesM0_fmap_e2_ph.nii.gz", + "sub-01_sesM0_fmap_e1.json", + "sub-01_sesM0_fmap_e1.nii.gz", + "sub-01_sesM0_fmap_e2.json", + "sub-01_sesM0_fmap_e2.nii.gz", + "sub-01_sesM0_fmap_e1_ph.json", + "sub-01_sesM0_fmap_e1_ph.nii.gz", + ), + BIDSFMAPCase.TWO_PHASES_TWO_MAGNITUDES, + ), + ( + ( + "sub-01_sesM0_fmap_e2_ph.json", + "sub-01_sesM0_fmap_e2_ph.nii.gz", + ), + BIDSFMAPCase.NOT_SUPPORTED, + ), + ( + ( + "sub-01_sesM0_fmap_e2_ph.json", + "sub-01_sesM0_fmap_e2_ph.nii.gz", + "sub-01_sesM0_fmap_e1.json", + ), + BIDSFMAPCase.NOT_SUPPORTED, + ), + ], +) +def test_infer_case_fmap(tmp_path, filenames, expected): + from clinica.iotools.converters.adni_to_bids.adni_modalities.adni_fmap import ( + infer_case_fmap, + ) + + fmap = tmp_path / "fmap" + fmap.mkdir() + for f in filenames: + (fmap / f).touch() + + assert infer_case_fmap(fmap) == expected