Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote File Support for input files #761

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dynamic = ["version"]
dependencies = [
"dnaio >= 1.2.0",
"xopen >= 1.6.0",
"smart_open >= 6.4.0",
]

[project.urls]
Expand Down
19 changes: 14 additions & 5 deletions src/cutadapt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ def get_argument_parser() -> ArgumentParser:
# Compression level for gzipped output files. Not exposed since we have -Z
group.add_argument("--compression-level", type=int, default=5,
help=SUPPRESS)
# transport_params passed to smart_open, see smart_open documentation. can be a json file or a json string
group.add_argument("--transport-params", type=str, default="",help=SUPPRESS)
# Disable adapter index creation
group.add_argument("--no-index", dest="index", default=True, action="store_false", help=SUPPRESS)

Expand Down Expand Up @@ -567,7 +569,7 @@ def determine_paired(args) -> bool:


def make_input_paths(
inputs: Sequence[str], paired: bool, interleaved: bool
inputs: Sequence[str], paired: bool, interleaved: bool, transport_params: str
) -> InputPaths:
"""
Do some other error checking of the input file names and return InputPaths.
Expand Down Expand Up @@ -605,10 +607,15 @@ def make_input_paths(

if input_paired_filename:
return InputPaths(
input_filename, input_paired_filename, interleaved=interleaved
input_filename,
input_paired_filename,
interleaved=interleaved,
transport_params=transport_params,
)
else:
return InputPaths(input_filename, interleaved=interleaved)
return InputPaths(
input_filename, interleaved=interleaved, transport_params=transport_params
)


def check_arguments(args, paired: bool) -> None:
Expand Down Expand Up @@ -1208,16 +1215,18 @@ def main(cmdlineargs, default_outfile=sys.stdout.buffer) -> Statistics:
file_opener = FileOpener(
compression_level=args.compression_level,
threads=estimate_compression_threads(cores),
transport_params=args.transport_params,
)
if sys.stderr.isatty() and not args.quiet and not args.debug:
progress = Progress()
else:
progress = DummyProgress()
paired = determine_paired(args)

try:
is_interleaved_input = args.interleaved and len(args.inputs) == 1
input_paths = make_input_paths(args.inputs, paired, is_interleaved_input)
input_paths = make_input_paths(
args.inputs, paired, is_interleaved_input, args.transport_params
)
check_arguments(args, paired)
adapters, adapters2 = adapters_from_args(args)
log_adapters(adapters, adapters2 if paired else None)
Expand Down
83 changes: 74 additions & 9 deletions src/cutadapt/files.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import errno
import io
import sys
import os
from abc import ABC, abstractmethod
from enum import Enum
from typing import BinaryIO, Optional, Dict, List, TextIO, Any

import dnaio
from xopen import xopen

import smart_open
import logging
import json
from cutadapt.utils import logger

try:
Expand All @@ -17,17 +19,42 @@
resource = None # type: ignore


def xopen_rb_raise_limit(path: str):
def xopen_rb_raise_limit(path: str, transport_params: str = ""):
"""
Open a (possibly compressed) file for reading in binary mode, trying to avoid the
"Too many open files" problem using `open_raise_limit`.
"""
mode = "rb"
f = open_raise_limit(xopen, path, mode, threads=0)
# transfer options : string of key=value,key=value or a file path: convert to dictionary
transport_params = get_transport_params(path, transport_params)
# smart_open to automatically open remote files, disable auto-compression
f = open_raise_limit(
smart_open.open,
path,
mode,
compression="disable",
transport_params=transport_params,
)
logging.getLogger("smart_open").setLevel(logging.WARNING)
# pass through to xopen to handle compression
f = open_raise_limit(xopen, f, mode, threads=4)
logger.debug("Opening '%s', mode '%s' with xopen resulted in %s", path, mode, f)
return f


def get_transport_params(path, transport_params):
if not transport_params:
return {}
# load from json file
if os.path.isfile(transport_params):
with open(transport_params) as f:
transport_params = json.load(f)
else:
transport_params = json.loads(transport_params)

return transport_params


def open_raise_limit(func, *args, **kwargs):
"""
Run 'func' (which should be some kind of open() function) and return its result.
Expand All @@ -54,26 +81,59 @@ def raise_open_files_limit(n):


class FileOpener:
def __init__(self, compression_level: int = 1, threads: Optional[int] = None):
def __init__(
self,
compression_level: int = 1,
threads: Optional[int] = None,
transport_params: str = "",
):
"""
threads -- no. of external compression threads.
0: write in-process
None: min(cpu_count(), 4)
"""
self.compression_level = compression_level
self.threads = threads
self.transport_params = transport_params

def smart_open(self, path, mode):
# get transport params for smart_open
transport_params = get_transport_params(path, self.transport_params)
# smart_open to automatically open remote files, disable auto-compression
f = open_raise_limit(
smart_open.open,
path,
mode,
compression="disable",
transport_params=transport_params,
)
logging.getLogger("smart_open").setLevel(logging.ERROR)
logger.debug(
"Opening output '%s', mode '%s' with smart_open resulted in %s",
path,
mode,
f,
)
return f

def xopen(self, path, mode):
threads = self.threads if "w" in mode else 0
# smart open to handle remote files
f = self.smart_open(path, mode)
# xopen to handle compression
f = open_raise_limit(
xopen, path, mode, compresslevel=self.compression_level, threads=threads
xopen, f, mode, compresslevel=self.compression_level, threads=threads
)
if "w" in mode:
extra = f" (compression level {self.compression_level}, {threads} threads)"
else:
extra = ""
logger.debug(
"Opening '%s', mode '%s'%s with xopen resulted in %s", path, mode, extra, f
"Opening output '%s', mode '%s'%s with xopen resulted in %s",
path,
mode,
extra,
f,
)
return f

Expand Down Expand Up @@ -133,12 +193,17 @@ def close(self) -> None:


class InputPaths:
def __init__(self, *paths: str, interleaved: bool = False):
def __init__(
self, *paths: str, interleaved: bool = False, transport_params: str = ""
):
self.paths = paths
self.interleaved = interleaved
self.transport_params = transport_params

def open(self) -> InputFiles:
files = [xopen_rb_raise_limit(path) for path in self.paths]
files = [
xopen_rb_raise_limit(path, self.transport_params) for path in self.paths
]
return InputFiles(*files, interleaved=self.interleaved)


Expand Down
36 changes: 35 additions & 1 deletion tests/test_files.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import os
import pickle

from cutadapt.files import ProxyTextFile, ProxyRecordWriter, OutputFiles
from cutadapt.files import (
ProxyTextFile,
ProxyRecordWriter,
OutputFiles,
xopen_rb_raise_limit,
)
from dnaio import SequenceRecord


Expand Down Expand Up @@ -125,6 +130,35 @@ def test_interleaved_record_writer(self, tmp_path):
o.close()
assert path.read_text() == "@r\nAACC\n+\n####\n@r\nGGTT\n+\n####\n"


def test_open_rb_local_file(tmp_path):

# Create a local file
file_path = tmp_path / "test.txt"
file_path.write_text("Hello, World!")
# Test opening a local file
file = xopen_rb_raise_limit(str(file_path))
assert file.read() == b"Hello, World!"
file.close()


def test_open_rb_remote_file():
# Test opening a remote file over https
file = xopen_rb_raise_limit(
"https://raw.githubusercontent.com/marcelm/cutadapt/main/tests/data/454.fa"
)
assert file.readline() == b">000163_1255_2627 length=52 uaccno=E0R4ISW01DCIQD\n"
file.close()


def test_open_rb_s3_file():
# Test opening a remote file on s3
file = xopen_rb_raise_limit("s3://platinum-genomes/2017-1.0/md5sum.txt")
assert (
file.readline()
== b"2e6aa26b42283bbbc4ca03686f427dc2 ./hg38/small_variants/ConfidentRegions.bed.gz\n"
)
file.close()
# - test force fasta
# - test qualities
# - test proxied
Expand Down
Loading