From c33a793da8b4b9bb688e6bc678100eb7fc814264 Mon Sep 17 00:00:00 2001 From: TrellixVulnTeam Date: Thu, 20 Oct 2022 00:54:13 +0000 Subject: [PATCH] Adding tarfile member sanitization to extractall() --- src/extra/python/isca/experiment.py | 21 ++++++++++++++++++++- src/extra/python/isca/util.py | 21 ++++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/extra/python/isca/experiment.py b/src/extra/python/isca/experiment.py index d176033d5..a9f24e49e 100755 --- a/src/extra/python/isca/experiment.py +++ b/src/extra/python/isca/experiment.py @@ -361,7 +361,26 @@ def make_restart_archive(self, archive_file, restart_directory): def extract_restart_archive(self, archive_file, input_directory): with tarfile.open(archive_file, 'r:gz') as tar: - tar.extractall(path=input_directory) + def is_within_directory(directory, target): + + abs_directory = os.path.abspath(directory) + abs_target = os.path.abspath(target) + + prefix = os.path.commonprefix([abs_directory, abs_target]) + + return prefix == abs_directory + + def safe_extract(tar, path=".", members=None, *, numeric_owner=False): + + for member in tar.getmembers(): + member_path = os.path.join(path, member.name) + if not is_within_directory(path, member_path): + raise Exception("Attempted Path Traversal in Tar File") + + tar.extractall(path, members, numeric_owner=numeric_owner) + + + safe_extract(tar, path=input_directory) self.log.info("Restart %s extracted to %s" % (archive_file, input_directory)) def derive(self, new_experiment_name): diff --git a/src/extra/python/isca/util.py b/src/extra/python/isca/util.py index 68696da2a..c13eb58f6 100644 --- a/src/extra/python/isca/util.py +++ b/src/extra/python/isca/util.py @@ -178,7 +178,26 @@ def interpolate_output(infile, outfile, all_fields=True, var_names=[], p_levs = @contextmanager def edit_restart_archive(restart_archive, outfile='./res_edit.tar.gz', tmp_dir='./restart_edit'): with tarfile.open(restart_archive, 'r:gz') as tar: - tar.extractall(path=tmp_dir) + def is_within_directory(directory, target): + + abs_directory = os.path.abspath(directory) + abs_target = os.path.abspath(target) + + prefix = os.path.commonprefix([abs_directory, abs_target]) + + return prefix == abs_directory + + def safe_extract(tar, path=".", members=None, *, numeric_owner=False): + + for member in tar.getmembers(): + member_path = os.path.join(path, member.name) + if not is_within_directory(path, member_path): + raise Exception("Attempted Path Traversal in Tar File") + + tar.extractall(path, members, numeric_owner=numeric_owner) + + + safe_extract(tar, path=tmp_dir) restart_files = [os.path.join(tmp_dir, x.split('/')[-1]) for x in tar.getnames() if x != '.'] try: yield {os.path.basename(f): f for f in restart_files}