From 0644a31aae735db9a05c7c7b72413c8aa74757f5 Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Tue, 6 Feb 2024 13:53:28 +0100 Subject: [PATCH 1/3] add remote file support to cutadapt input --- pyproject.toml | 1 + src/cutadapt/files.py | 41 +++++++++++++++++++++++++++++++++++++++-- src/cutadapt/runners.py | 4 ++-- tests/test_files.py | 24 +++++++++++++++++++++++- 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 73511a88..96cc588c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dynamic = ["version"] dependencies = [ "dnaio >= 1.2.0", "xopen >= 1.6.0", + "smart_open >= 6.4.0", ] [project.urls] diff --git a/src/cutadapt/files.py b/src/cutadapt/files.py index 5b902b2c..b96e7796 100644 --- a/src/cutadapt/files.py +++ b/src/cutadapt/files.py @@ -1,10 +1,11 @@ import errno import io import sys +import os from abc import ABC, abstractmethod from enum import Enum from typing import BinaryIO, Optional, Dict, List, TextIO, Any - +import lzma import dnaio from xopen import xopen @@ -16,6 +17,42 @@ # Windows resource = None # type: ignore +def open_rb(path: str): + """ + Open a (possibly compressed) file for reading in binary mode. + Determines if the file is local or remote and opens it accordingly. + """ + # stdin: + #if path == "-": + # return sys.stdin.buffer + # local file: open with xopen routines + if os.path.exists(path): + return xopen_rb_raise_limit(path) + # assume remote file + else: + return smart_open_rb(path) + +def smart_open_rb(path: str): + + # use smart_open library + try: + import smart_open + + except ImportError: + raise ImportError( + "The smart_open package is required to read from remote files" + ) + # for xz : load additional library + if path.endswith(".xz"): + def _handle_xz(file_obj,mode='rb'): + return lzma.LZMAFile(file_obj,mode,format=lzma.FORMAT_XZ) + smart_open.register_compressor(".xz", _handle_xz) + try: + return smart_open.open(path, "rb") + except Exception as e: + logger.error("Error opening '%s': %s", path, e) + raise + def xopen_rb_raise_limit(path: str): """ @@ -138,7 +175,7 @@ def __init__(self, *paths: str, interleaved: bool = False): self.interleaved = interleaved def open(self) -> InputFiles: - files = [xopen_rb_raise_limit(path) for path in self.paths] + files = [open_rb(path) for path in self.paths] return InputFiles(*files, interleaved=self.interleaved) diff --git a/src/cutadapt/runners.py b/src/cutadapt/runners.py index bd11b447..8980ca2a 100644 --- a/src/cutadapt/runners.py +++ b/src/cutadapt/runners.py @@ -15,7 +15,7 @@ InputFiles, OutputFiles, InputPaths, - xopen_rb_raise_limit, + open_rb, detect_file_format, FileFormat, ProxyWriter, @@ -91,7 +91,7 @@ def run(self): try: with ExitStack() as stack: files = [ - stack.enter_context(xopen_rb_raise_limit(path)) + stack.enter_context(open_rb(path)) for path in self._paths ] file_format = detect_file_format(files[0]) diff --git a/tests/test_files.py b/tests/test_files.py index 8320ceaa..4c654634 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -1,7 +1,7 @@ import os import pickle -from cutadapt.files import ProxyTextFile, ProxyRecordWriter, OutputFiles +from cutadapt.files import ProxyTextFile, ProxyRecordWriter, OutputFiles,open_rb from dnaio import SequenceRecord @@ -125,6 +125,28 @@ def test_interleaved_record_writer(self, tmp_path): o.close() assert path.read_text() == "@r\nAACC\n+\n####\n@r\nGGTT\n+\n####\n" + +def test_open_rb_local_file(tmp_path): + + # Create a local file + file_path = tmp_path / "test.txt" + file_path.write_text("Hello, World!") + # Test opening a local file + file = open_rb(str(file_path)) + assert file.read() == b"Hello, World!" + file.close() + +def test_open_rb_remote_file(): + # Test opening a remote file over https + file = open_rb("https://raw.githubusercontent.com/marcelm/cutadapt/main/tests/data/454.fa") + assert file.readline() == b">000163_1255_2627 length=52 uaccno=E0R4ISW01DCIQD\n" + file.close() + +def test_open_rb_s3_file(): + # Test opening a remote file on s3 + file = open_rb("s3://platinum-genomes/2017-1.0/md5sum.txt") + assert file.readline() == b"2e6aa26b42283bbbc4ca03686f427dc2 ./hg38/small_variants/ConfidentRegions.bed.gz\n" + file.close() # - test force fasta # - test qualities # - test proxied From 0f06f8f4d8d7fea579cf957201d61bfe82c9d7bc Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Tue, 6 Feb 2024 15:54:10 +0100 Subject: [PATCH 2/3] formatting fix --- src/cutadapt/files.py | 49 +++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/src/cutadapt/files.py b/src/cutadapt/files.py index b96e7796..1b1feca0 100644 --- a/src/cutadapt/files.py +++ b/src/cutadapt/files.py @@ -17,41 +17,44 @@ # Windows resource = None # type: ignore + def open_rb(path: str): """ Open a (possibly compressed) file for reading in binary mode. Determines if the file is local or remote and opens it accordingly. """ - # stdin: - #if path == "-": - # return sys.stdin.buffer # local file: open with xopen routines if os.path.exists(path): return xopen_rb_raise_limit(path) - # assume remote file + # assume remote file else: return smart_open_rb(path) + def smart_open_rb(path: str): - - # use smart_open library - try: - import smart_open - - except ImportError: - raise ImportError( - "The smart_open package is required to read from remote files" - ) - # for xz : load additional library - if path.endswith(".xz"): - def _handle_xz(file_obj,mode='rb'): - return lzma.LZMAFile(file_obj,mode,format=lzma.FORMAT_XZ) - smart_open.register_compressor(".xz", _handle_xz) - try: - return smart_open.open(path, "rb") - except Exception as e: - logger.error("Error opening '%s': %s", path, e) - raise + """ + Open a (possibly compressed) remote file for reading in binary mode. + see smart_open documentation for details + """ + try: + import smart_open + + except ImportError: + raise ImportError( + "The smart_open package is required to read from remote files" + ) + # for xz : load additional library + if path.endswith(".xz"): + + def _handle_xz(file_obj, mode="rb"): + return lzma.LZMAFile(file_obj, mode, format=lzma.FORMAT_XZ) + + smart_open.register_compressor(".xz", _handle_xz) + try: + return smart_open.open(path, "rb") + except Exception as e: + logger.error("Error opening '%s': %s", path, e) + raise def xopen_rb_raise_limit(path: str): From da2aebc4db51afe87f0e490b1c9044a431993ae9 Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Thu, 8 Feb 2024 16:38:23 +0100 Subject: [PATCH 3/3] passthrough of filehandles between smart_open en xopen to benefit from both --- src/cutadapt/cli.py | 19 +++++-- src/cutadapt/files.py | 121 ++++++++++++++++++++++++---------------- src/cutadapt/runners.py | 4 +- tests/test_files.py | 26 ++++++--- 4 files changed, 108 insertions(+), 62 deletions(-) diff --git a/src/cutadapt/cli.py b/src/cutadapt/cli.py index 81a02e6c..b97080b6 100644 --- a/src/cutadapt/cli.py +++ b/src/cutadapt/cli.py @@ -189,6 +189,8 @@ def get_argument_parser() -> ArgumentParser: # Compression level for gzipped output files. Not exposed since we have -Z group.add_argument("--compression-level", type=int, default=5, help=SUPPRESS) + # transport_params passed to smart_open, see smart_open documentation. can be a json file or a json string + group.add_argument("--transport-params", type=str, default="",help=SUPPRESS) # Disable adapter index creation group.add_argument("--no-index", dest="index", default=True, action="store_false", help=SUPPRESS) @@ -567,7 +569,7 @@ def determine_paired(args) -> bool: def make_input_paths( - inputs: Sequence[str], paired: bool, interleaved: bool + inputs: Sequence[str], paired: bool, interleaved: bool, transport_params: str ) -> InputPaths: """ Do some other error checking of the input file names and return InputPaths. @@ -605,10 +607,15 @@ def make_input_paths( if input_paired_filename: return InputPaths( - input_filename, input_paired_filename, interleaved=interleaved + input_filename, + input_paired_filename, + interleaved=interleaved, + transport_params=transport_params, ) else: - return InputPaths(input_filename, interleaved=interleaved) + return InputPaths( + input_filename, interleaved=interleaved, transport_params=transport_params + ) def check_arguments(args, paired: bool) -> None: @@ -1208,16 +1215,18 @@ def main(cmdlineargs, default_outfile=sys.stdout.buffer) -> Statistics: file_opener = FileOpener( compression_level=args.compression_level, threads=estimate_compression_threads(cores), + transport_params=args.transport_params, ) if sys.stderr.isatty() and not args.quiet and not args.debug: progress = Progress() else: progress = DummyProgress() paired = determine_paired(args) - try: is_interleaved_input = args.interleaved and len(args.inputs) == 1 - input_paths = make_input_paths(args.inputs, paired, is_interleaved_input) + input_paths = make_input_paths( + args.inputs, paired, is_interleaved_input, args.transport_params + ) check_arguments(args, paired) adapters, adapters2 = adapters_from_args(args) log_adapters(adapters, adapters2 if paired else None) diff --git a/src/cutadapt/files.py b/src/cutadapt/files.py index 1b1feca0..f7959cf2 100644 --- a/src/cutadapt/files.py +++ b/src/cutadapt/files.py @@ -5,10 +5,11 @@ from abc import ABC, abstractmethod from enum import Enum from typing import BinaryIO, Optional, Dict, List, TextIO, Any -import lzma import dnaio from xopen import xopen - +import smart_open +import logging +import json from cutadapt.utils import logger try: @@ -18,56 +19,42 @@ resource = None # type: ignore -def open_rb(path: str): - """ - Open a (possibly compressed) file for reading in binary mode. - Determines if the file is local or remote and opens it accordingly. - """ - # local file: open with xopen routines - if os.path.exists(path): - return xopen_rb_raise_limit(path) - # assume remote file - else: - return smart_open_rb(path) - - -def smart_open_rb(path: str): - """ - Open a (possibly compressed) remote file for reading in binary mode. - see smart_open documentation for details - """ - try: - import smart_open - - except ImportError: - raise ImportError( - "The smart_open package is required to read from remote files" - ) - # for xz : load additional library - if path.endswith(".xz"): - - def _handle_xz(file_obj, mode="rb"): - return lzma.LZMAFile(file_obj, mode, format=lzma.FORMAT_XZ) - - smart_open.register_compressor(".xz", _handle_xz) - try: - return smart_open.open(path, "rb") - except Exception as e: - logger.error("Error opening '%s': %s", path, e) - raise - - -def xopen_rb_raise_limit(path: str): +def xopen_rb_raise_limit(path: str, transport_params: str = ""): """ Open a (possibly compressed) file for reading in binary mode, trying to avoid the "Too many open files" problem using `open_raise_limit`. """ mode = "rb" - f = open_raise_limit(xopen, path, mode, threads=0) + # transfer options : string of key=value,key=value or a file path: convert to dictionary + transport_params = get_transport_params(path, transport_params) + # smart_open to automatically open remote files, disable auto-compression + f = open_raise_limit( + smart_open.open, + path, + mode, + compression="disable", + transport_params=transport_params, + ) + logging.getLogger("smart_open").setLevel(logging.WARNING) + # pass through to xopen to handle compression + f = open_raise_limit(xopen, f, mode, threads=4) logger.debug("Opening '%s', mode '%s' with xopen resulted in %s", path, mode, f) return f +def get_transport_params(path, transport_params): + if not transport_params: + return {} + # load from json file + if os.path.isfile(transport_params): + with open(transport_params) as f: + transport_params = json.load(f) + else: + transport_params = json.loads(transport_params) + + return transport_params + + def open_raise_limit(func, *args, **kwargs): """ Run 'func' (which should be some kind of open() function) and return its result. @@ -94,7 +81,12 @@ def raise_open_files_limit(n): class FileOpener: - def __init__(self, compression_level: int = 1, threads: Optional[int] = None): + def __init__( + self, + compression_level: int = 1, + threads: Optional[int] = None, + transport_params: str = "", + ): """ threads -- no. of external compression threads. 0: write in-process @@ -102,18 +94,46 @@ def __init__(self, compression_level: int = 1, threads: Optional[int] = None): """ self.compression_level = compression_level self.threads = threads + self.transport_params = transport_params + + def smart_open(self, path, mode): + # get transport params for smart_open + transport_params = get_transport_params(path, self.transport_params) + # smart_open to automatically open remote files, disable auto-compression + f = open_raise_limit( + smart_open.open, + path, + mode, + compression="disable", + transport_params=transport_params, + ) + logging.getLogger("smart_open").setLevel(logging.ERROR) + logger.debug( + "Opening output '%s', mode '%s' with smart_open resulted in %s", + path, + mode, + f, + ) + return f def xopen(self, path, mode): threads = self.threads if "w" in mode else 0 + # smart open to handle remote files + f = self.smart_open(path, mode) + # xopen to handle compression f = open_raise_limit( - xopen, path, mode, compresslevel=self.compression_level, threads=threads + xopen, f, mode, compresslevel=self.compression_level, threads=threads ) if "w" in mode: extra = f" (compression level {self.compression_level}, {threads} threads)" else: extra = "" logger.debug( - "Opening '%s', mode '%s'%s with xopen resulted in %s", path, mode, extra, f + "Opening output '%s', mode '%s'%s with xopen resulted in %s", + path, + mode, + extra, + f, ) return f @@ -173,12 +193,17 @@ def close(self) -> None: class InputPaths: - def __init__(self, *paths: str, interleaved: bool = False): + def __init__( + self, *paths: str, interleaved: bool = False, transport_params: str = "" + ): self.paths = paths self.interleaved = interleaved + self.transport_params = transport_params def open(self) -> InputFiles: - files = [open_rb(path) for path in self.paths] + files = [ + xopen_rb_raise_limit(path, self.transport_params) for path in self.paths + ] return InputFiles(*files, interleaved=self.interleaved) diff --git a/src/cutadapt/runners.py b/src/cutadapt/runners.py index bfb791c4..7fea055e 100644 --- a/src/cutadapt/runners.py +++ b/src/cutadapt/runners.py @@ -15,7 +15,7 @@ InputFiles, OutputFiles, InputPaths, - open_rb, + xopen_rb_raise_limit, detect_file_format, FileFormat, ProxyWriter, @@ -91,7 +91,7 @@ def run(self): try: with ExitStack() as stack: files = [ - stack.enter_context(open_rb(path)) + stack.enter_context(xopen_rb_raise_limit(path)) for path in self._paths ] file_format = detect_file_format(files[0]) diff --git a/tests/test_files.py b/tests/test_files.py index 4c654634..c94e0ee7 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -1,7 +1,12 @@ import os import pickle -from cutadapt.files import ProxyTextFile, ProxyRecordWriter, OutputFiles,open_rb +from cutadapt.files import ( + ProxyTextFile, + ProxyRecordWriter, + OutputFiles, + xopen_rb_raise_limit, +) from dnaio import SequenceRecord @@ -127,25 +132,32 @@ def test_interleaved_record_writer(self, tmp_path): def test_open_rb_local_file(tmp_path): - + # Create a local file file_path = tmp_path / "test.txt" file_path.write_text("Hello, World!") # Test opening a local file - file = open_rb(str(file_path)) + file = xopen_rb_raise_limit(str(file_path)) assert file.read() == b"Hello, World!" file.close() + def test_open_rb_remote_file(): # Test opening a remote file over https - file = open_rb("https://raw.githubusercontent.com/marcelm/cutadapt/main/tests/data/454.fa") + file = xopen_rb_raise_limit( + "https://raw.githubusercontent.com/marcelm/cutadapt/main/tests/data/454.fa" + ) assert file.readline() == b">000163_1255_2627 length=52 uaccno=E0R4ISW01DCIQD\n" file.close() + def test_open_rb_s3_file(): - # Test opening a remote file on s3 - file = open_rb("s3://platinum-genomes/2017-1.0/md5sum.txt") - assert file.readline() == b"2e6aa26b42283bbbc4ca03686f427dc2 ./hg38/small_variants/ConfidentRegions.bed.gz\n" + # Test opening a remote file on s3 + file = xopen_rb_raise_limit("s3://platinum-genomes/2017-1.0/md5sum.txt") + assert ( + file.readline() + == b"2e6aa26b42283bbbc4ca03686f427dc2 ./hg38/small_variants/ConfidentRegions.bed.gz\n" + ) file.close() # - test force fasta # - test qualities