Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MRG] Add --save-matches to fastmultigather #397

Merged
merged 23 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ As of v0.9.5, `fastgather` outputs the same columns as `sourmash gather`, with o

`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs many CSVs:
```
sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4
sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches
```
<!-- We suggest using a manifest CSV for the queries. CTB -->

Expand All @@ -234,6 +234,8 @@ On a database of sketches (but not on RocksDB indexes) `fastmultigather` will ou

The prefetch CSV will be named `{signame}.prefetch.csv`, and the gather CSV will be named `{signame}.gather.csv`. Here, `{signame}` is the name of your sourmash signature.

`--save-matches` is an optional flag that will save the matched hashes for each query in a separate sourmash signature `{signame}.matches.sig`. This can be useful for debugging or for further analysis.

When searching against a RocksDB index, `fastmultigather` will output a single file containing all gather results, specified with `-o/--output`. No prefetch results will be output.

`fastmultigather` gather CSVs provide the same columns as `fastgather`, above.
Expand Down
51 changes: 50 additions & 1 deletion src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
use anyhow::Result;
use rayon::prelude::*;

use sourmash::selection::Selection;
use sourmash::prelude::ToWriter;
use sourmash::{selection::Selection, signature::SigsTrait};

use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
Expand All @@ -11,6 +12,13 @@ use std::collections::BinaryHeap;

use camino::Utf8Path as PathBuf;

use std::collections::HashSet;
use std::fs::File;

use sourmash::signature::Signature;
use sourmash::sketch::minhash::KmerMinHash;
use sourmash::sketch::Sketch;

use crate::utils::{
consume_query_by_gather, load_collection, load_sketches, write_prefetch, PrefetchResult,
ReportType,
Expand All @@ -23,6 +31,7 @@ pub fn fastmultigather(
scaled: usize,
selection: &Selection,
allow_failed_sigpaths: bool,
save_matches: bool,
) -> Result<()> {
// load query collection
let query_collection = load_collection(
Expand Down Expand Up @@ -70,12 +79,23 @@ pub fn fastmultigather(
let prefix = name.split(' ').next().unwrap_or_default().to_string();
let location = PathBuf::new(&prefix).file_name().unwrap();
if let Some(query_mh) = query_sig.minhash() {
let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None };
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if overlap >= threshold_hashes {
if save_matches {
if let Ok(intersection) =
against.minhash.intersection(query_mh)
{
matching_hashes
.as_mut()
.unwrap()
.extend(intersection.0);
}
}
let result = PrefetchResult {
name: against.name.clone(),
md5sum: against.md5sum.clone(),
Expand Down Expand Up @@ -105,6 +125,35 @@ pub fn fastmultigather(
Some(gather_output),
)
.ok();

// Save matching hashes to .sig file if save_matches is true
if save_matches {
if let Some(hashes) = matching_hashes {
let sig_filename = format!("{}.matches.sig", name);
if let Ok(mut file) = File::create(&sig_filename) {
let unique_hashes: HashSet<u64> = hashes.into_iter().collect();
let mut new_mh = KmerMinHash::new(
query_mh.scaled().try_into().unwrap(),
query_mh.ksize().try_into().unwrap(),
query_mh.hash_function().clone(),
query_mh.seed(),
false,
query_mh.num(),
);
new_mh
.add_many(&unique_hashes.into_iter().collect::<Vec<_>>())
.ok();
let mut signature = Signature::default();
signature.push(Sketch::MinHash(new_mh));
signature.set_filename(&name);
if let Err(e) = signature.to_writer(&mut file) {
eprintln!("Error writing signature file: {}", e);
}
} else {
eprintln!("Error creating signature file: {}", sig_filename);
}
}
}
} else {
println!("No matches to '{}'", location);
}
Expand Down
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn do_fastgather(
}

#[pyfunction]
#[pyo3(signature = (query_filenames, siglist_path, threshold_bp, ksize, scaled, moltype, output_path=None))]
#[pyo3(signature = (query_filenames, siglist_path, threshold_bp, ksize, scaled, moltype, output_path=None, save_matches=false))]
fn do_fastmultigather(
query_filenames: String,
siglist_path: String,
Expand All @@ -115,6 +115,7 @@ fn do_fastmultigather(
scaled: usize,
moltype: String,
output_path: Option<String>,
save_matches: bool,
) -> anyhow::Result<u8> {
let againstfile_path: camino::Utf8PathBuf = siglist_path.clone().into();
let selection = build_selection(ksize, scaled, &moltype);
Expand Down Expand Up @@ -147,6 +148,7 @@ fn do_fastmultigather(
scaled,
&selection,
allow_failed_sigpaths,
save_matches,
) {
Ok(_) => Ok(0),
Err(e) => {
Expand Down Expand Up @@ -215,8 +217,8 @@ fn do_check(index: String, quick: bool) -> anyhow::Result<u8> {
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (querylist_path, siglist_path, threshold, ksize, scaled, moltype, estimate_ani, output_path=None))]
#[allow(clippy::too_many_arguments)]
fn do_multisearch(
querylist_path: String,
siglist_path: String,
Expand Down
7 changes: 5 additions & 2 deletions src/python/sourmash_plugin_branchwater/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,12 @@ def __init__(self, p):
p.add_argument('-c', '--cores', default=0, type=int,
help='number of cores to use (default is all available)')
p.add_argument('-o', '--output', help='CSV output file for matches')
p.add_argument('--save-matches', action='store_true', default=False, help='save matched hashes for every input to a signature')


def main(self, args):
print_version()
notify(f"ksize: {args.ksize} / scaled: {args.scaled} / moltype: {args.moltype} / threshold bp: {args.threshold_bp}")
notify(f"ksize: {args.ksize} / scaled: {args.scaled} / moltype: {args.moltype} / threshold bp: {args.threshold_bp} / save matches: {args.save_matches}")
args.moltype = args.moltype.lower()

num_threads = set_thread_pool(args.cores)
Expand All @@ -176,7 +177,9 @@ def main(self, args):
args.ksize,
args.scaled,
args.moltype,
args.output)
args.output,
args.save_matches
)
if status == 0:
notify(f"...fastmultigather is done!")
return status
Expand Down
57 changes: 57 additions & 0 deletions src/python/tests/test_multigather.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,3 +1212,60 @@ def test_rocksdb_no_internal_storage_gather_fails(runtmp, capfd):
assert "Error gathering matches:" in captured.err
assert "ERROR: 1 failed gathers. See error messages above." in captured.err
assert "Unresolvable errors found; results cannot be trusted. Quitting." in captured.err



def test_save_matches(runtmp):
# test basic execution!
query = get_test_data('SRR606249.sig.gz')
sig2 = get_test_data('2.fa.sig.gz')
sig47 = get_test_data('47.fa.sig.gz')
sig63 = get_test_data('63.fa.sig.gz')

query_list = runtmp.output('query.txt')
against_list = runtmp.output('against.txt')

make_file_list(query_list, [query])
make_file_list(against_list, [sig2, sig47, sig63])


runtmp.sourmash('scripts', 'fastmultigather', query_list, against_list,
'-s', '100000', '-t', '0', '--save-matches',
in_directory=runtmp.output(''))

print(os.listdir(runtmp.output('')))

g_output = runtmp.output('SRR606249.gather.csv')
p_output = runtmp.output('SRR606249.prefetch.csv')
m_output = runtmp.output('SRR606249.matches.sig')

assert os.path.exists(g_output)
assert os.path.exists(p_output)
assert os.path.exists(m_output)

# check prefetch output (only non-indexed gather)
df = pandas.read_csv(p_output)

assert len(df) == 3
keys = set(df.keys())
assert keys == {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'intersect_bp'}

assert os.path.exists(g_output)
df = pandas.read_csv(g_output)

assert len(df) == 3
keys = set(df.keys())
assert {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'intersect_bp', 'gather_result_rank'}.issubset(keys)

# can't test against prefetch because matched k-mers can overlap
match_ss = list(sourmash.load_file_as_signatures(m_output, ksize=31))[0]
match_mh = match_ss.minhash
matches_sig_len = len(match_mh)

# right size?
assert sum(df['intersect_bp']) >= matches_sig_len * 100_000

# containment?
mg_ss = list(sourmash.load_file_as_signatures(query, ksize=31))[0]
assert match_mh.contained_by(mg_ss.minhash) == 1.0
assert mg_ss.minhash.contained_by(match_mh) < 1