Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data loss issue in combine_echodata #824

Merged
merged 100 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from 97 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
321e53d
start creating the structure for lazy echodata combine
b-reyes Aug 27, 2022
662bca4
Merge branch 'dev' into lazy-comb-files
b-reyes Aug 30, 2022
c1426f2
create PreprocessCallable class and add functionality to laze_combine
b-reyes Aug 30, 2022
bb59291
finish creating a working version of lazy_combine
b-reyes Aug 31, 2022
e58df72
start working on v2 of combine_lazily
b-reyes Aug 31, 2022
e2b9ec6
get a working version of direct_write in combine_lazily_v2
b-reyes Sep 1, 2022
1d8dffa
make construct_lazy_ds return ds_unwritten
b-reyes Sep 1, 2022
b4d9a13
correctly write all variables and dimensions for the Environment grou…
b-reyes Sep 2, 2022
67877a1
account for the rest of the constant dimensions
b-reyes Sep 2, 2022
44faf4d
add comments and documentation to code in combine_lazily_v2
b-reyes Sep 2, 2022
2a89e6d
make combine_lazily_v2 into a class
b-reyes Sep 6, 2022
71fc731
add mechanism to strore dataset attributes and make first attempt at …
b-reyes Sep 6, 2022
6be4dc0
delay region write in direct_write
b-reyes Sep 7, 2022
ce62334
add sychronizer for to_zarr and turn off blosc threads when using com…
b-reyes Sep 8, 2022
36afe2b
Rename class and add attributes from all datasets to the Provenance g…
b-reyes Sep 8, 2022
8e95644
add additional type checks to combine
b-reyes Sep 8, 2022
a7b51e7
rename combine_lazily_v2.py to zarr_combine.py
b-reyes Sep 8, 2022
932355e
start simplifying the logic needed to append data and removal of para…
b-reyes Sep 9, 2022
36768c6
reorganize code and include original compressor in encodings
b-reyes Sep 10, 2022
3d87f0e
document functions and add retries in compute
b-reyes Sep 12, 2022
339ce72
start implementing checks for time and channel coordinates
b-reyes Sep 12, 2022
c2af831
add TODO statements
b-reyes Sep 13, 2022
3665a56
fix pre-commit issues
b-reyes Sep 13, 2022
8eaed23
add routine to check Dataset attributes and drop them if they are num…
b-reyes Sep 15, 2022
7ff0ea1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 15, 2022
b7fd81e
set all variables and dims compressor to be the same in io.py and def…
b-reyes Sep 16, 2022
db4cf9a
merge in origin branch
b-reyes Sep 16, 2022
9bdc0a9
Merge branch 'dev' into lazy-comb-files
b-reyes Sep 16, 2022
14ccb84
change conversion to combination
b-reyes Sep 16, 2022
3c7ad86
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 16, 2022
a34e6c3
set compressor encoding for all types of zarr variables
b-reyes Sep 16, 2022
3740e94
Merge branch 'lazy-comb-files' of https://github.com/b-reyes/echopype…
b-reyes Sep 16, 2022
aaedf5a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 16, 2022
64d5242
change Provenance attribute name back to conversion and add zarr comp…
b-reyes Sep 16, 2022
2f84ffa
resolve conflict
b-reyes Sep 16, 2022
b3993bc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 16, 2022
496c470
remove unnecessary import
b-reyes Sep 16, 2022
85c5cc2
Merge branch 'dev' into lazy-comb-files
b-reyes Sep 16, 2022
2735334
remove chunking in Platform group for EK60 set_groups
b-reyes Sep 16, 2022
ace66dc
add todo about filename variable write
b-reyes Sep 16, 2022
efe940d
allow for variables with different sized dims to be written (primaril…
b-reyes Sep 21, 2022
7c76299
Merge branch 'dev' into lazy-comb-files
b-reyes Sep 22, 2022
5e806d1
document and finalize check_channels
b-reyes Sep 22, 2022
7e55531
document and finalize check_ascending_ds_times
b-reyes Sep 22, 2022
41bccab
investigate decompression error and create routines that can identify…
b-reyes Sep 24, 2022
b5f2acd
start working on locking writes to zarr
b-reyes Sep 25, 2022
321c820
remove locking scheme attempt and return to corrupted approach, place…
b-reyes Sep 26, 2022
12f5829
create general get_zarr_compression function in io so that it can be …
b-reyes Sep 26, 2022
f4922b0
change filenames numbering to range(len(eds))
b-reyes Sep 26, 2022
449f4e5
remove old time checks and replace with new time check for combined d…
b-reyes Sep 27, 2022
419b174
remove unused old combine code
b-reyes Sep 27, 2022
74920ec
implement a reverse time check and update zarr and ed_comb appropriat…
b-reyes Sep 27, 2022
fbd88a9
resolve conflict in combine.py
b-reyes Sep 27, 2022
2b3b0af
remove alternative combine .py scripts
b-reyes Sep 27, 2022
5636f52
finish documenting zarr_combine
b-reyes Sep 27, 2022
ddfb5fc
begin documenting the new combine api, create code section to validat…
b-reyes Sep 28, 2022
d495b14
remove commented out lock code and remove reference to dask.distributed
b-reyes Sep 28, 2022
df3fa1a
finalize docs and comments for the combine_echodata api
b-reyes Sep 28, 2022
bd01a28
revise combine_echodata bullet points and code section
b-reyes Sep 28, 2022
6f9b16a
modify Notes bullet points in combine_echodata docs
b-reyes Sep 28, 2022
4290c02
correct and highlight the default zarr_path in combine_echodata docs
b-reyes Sep 28, 2022
e9f1ecd
construct mapping for lock scheme
b-reyes Sep 28, 2022
757825e
remove append dimensions when doing a prallel write to zarr files and…
b-reyes Sep 29, 2022
531f5ad
remove append dimensions from dataset that will be written and add cl…
b-reyes Sep 29, 2022
940cc21
create class variable max_append_chunk_size that sets an upperbound o…
b-reyes Sep 30, 2022
9b8a7b9
resolve conflicts with dev branch
b-reyes Sep 30, 2022
dc1abf7
start documenting chunk mapping functions
b-reyes Sep 30, 2022
8d90363
finish documenting current function that construct the uniform to non…
b-reyes Sep 30, 2022
61c5265
add function that writes all append dimensions and finish documenting…
b-reyes Sep 30, 2022
b9f2b28
remove unnecessary test_cluster_dump folder
b-reyes Oct 1, 2022
42a2934
add back in items in test_data README
b-reyes Oct 1, 2022
b244f4d
modify docs, close client if it was not provided, include duplicate_p…
b-reyes Oct 3, 2022
260215e
add distributed to requirements.txt
b-reyes Oct 3, 2022
6dbde23
change distributed in requirements.txt to the dask specific version
b-reyes Oct 3, 2022
58e842e
import dask.distibuted and include dask.distibuted in typing of combi…
b-reyes Oct 4, 2022
dbe8b7a
Simplify the logic for checking the input client and printing the das…
b-reyes Oct 4, 2022
d670fef
add overwrite kwarg to combine_echodata and rectify warning caused by…
b-reyes Oct 4, 2022
d18aa6c
modify input to validate_output_path so it will work with s3 buckets
b-reyes Oct 5, 2022
1cee3eb
remove double quotes
b-reyes Oct 5, 2022
ee05db4
allow Path type for zarr_path
b-reyes Oct 5, 2022
3d96012
add union typing
b-reyes Oct 5, 2022
e5334c1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 5, 2022
fe0379b
add storage_options to open_converted call
b-reyes Oct 5, 2022
bf4c48f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 5, 2022
bfb9209
add storage_options to zarr.open_array call
b-reyes Oct 5, 2022
5e5c882
only allow zarr_path to be a string and remove option for Path type
b-reyes Oct 5, 2022
3acce9d
send client dashboard link to the logger instead of printing it
b-reyes Oct 5, 2022
4facc49
set storage_option equal to empty dict in _modify_prov_filenames
b-reyes Oct 5, 2022
b3b375d
change storage options typing and set default value for overwrite in …
b-reyes Oct 5, 2022
4f1d4fb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 5, 2022
376c56f
add Dict and Any typing in combine.py
b-reyes Oct 5, 2022
387479b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 5, 2022
0bf73af
change docstring type for storage_options in check_zarr_path
b-reyes Oct 5, 2022
9346fb1
change storage options typing in combine_echodata input
b-reyes Oct 5, 2022
290c729
update typing for storage_options in docstring of combine_echodata
b-reyes Oct 5, 2022
b3a51ff
change all typing for storage_options in zarr_combine
b-reyes Oct 5, 2022
6386646
remove typing types from docstrings and add optional where necessary
b-reyes Oct 6, 2022
5afa715
specify the type of the elements in a list within docstrings
b-reyes Oct 6, 2022
a3b3f7f
Merge branch 'dev' into fix-data-loss
b-reyes Oct 6, 2022
0adc23c
Merge branch 'dev' into fix-data-loss
b-reyes Oct 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 83 additions & 29 deletions echopype/echodata/combine.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from pathlib import Path
from typing import List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from warnings import warn

import dask.distributed
import fsspec
import xarray as xr
from dask.distributed import Client

from ..qc import coerce_increasing_time, exist_reversed_time
from ..utils.io import validate_output_path
Expand All @@ -13,7 +16,9 @@
logger = _init_logger(__name__)


def check_zarr_path(zarr_path: str, storage_options: Optional[dict]) -> str:
def check_zarr_path(
zarr_path: str, storage_options: Dict[str, Any] = {}, overwrite: bool = False
) -> str:
"""
Checks that the zarr path provided to ``combine``
is valid.
Expand All @@ -22,9 +27,13 @@ def check_zarr_path(zarr_path: str, storage_options: Optional[dict]) -> str:
----------
zarr_path: str
The full save path to the final combined zarr store
storage_options: Optional[dict]
storage_options: dict
Any additional parameters for the storage
backend (ignored for local paths)
overwrite: bool
If True, will overwrite the zarr store specified by
``zarr_path`` if it already exists, otherwise an error
will be returned if the file already exists.

Returns
-------
Expand All @@ -35,32 +44,45 @@ def check_zarr_path(zarr_path: str, storage_options: Optional[dict]) -> str:
------
ValueError
If the provided zarr path does not point to a zarr file
RuntimeError
If ``zarr_path`` already exists and ``overwrite=False``
"""

if zarr_path is None:
# check that zarr_path is a string
if not isinstance(zarr_path, str):
raise TypeError(f"zarr_path must be of type {str}")

# assign values, if no zarr path has been provided
source_file = "combined_echodatas.zarr"
save_path = None
else:

# turn string path into Path object
path_obj = Path(zarr_path)
if path_obj.suffix != ".zarr":
raise ValueError("The provided zarr_path input must point to a zarr file!")
else:
# check that the appropriate suffix was provided
if not zarr_path.strip("/").endswith(".zarr"):
raise ValueError("The provided zarr_path input must have '.zarr' suffix!")

# assign values based on zarr path
source_file = path_obj.parts[-1]
save_path = path_obj.parent
# set default source_file name (will be used only if zarr_path is None)
source_file = "combined_echodatas.zarr"

return validate_output_path(
validated_path = validate_output_path(
source_file=source_file,
engine="zarr",
output_storage_options=storage_options,
save_path=save_path,
save_path=zarr_path,
)

# check if validated_path already exists
fs = fsspec.get_mapper(validated_path, **storage_options).fs # get file system
exists = True if fs.exists(validated_path) else False

if exists and not overwrite:
raise RuntimeError(
f"{zarr_path} already exists, please provide a different path or set overwrite=True."
)
elif exists and overwrite:

logger.info(f"overwriting {validated_path}")

# remove zarr file
fs.rm(validated_path, recursive=True)

return validated_path


def check_echodatas_input(echodatas: List[EchoData]) -> Tuple[str, List[str]]:
"""
Expand All @@ -69,14 +91,14 @@ def check_echodatas_input(echodatas: List[EchoData]) -> Tuple[str, List[str]]:

Parameters
----------
echodatas: List[EchoData]
echodatas: list
The list of `EchoData` objects to be combined.

Returns
-------
sonar_model : str
The sonar model used for all values in ``echodatas``
echodata_filenames : List[str]
echodata_filenames : list
The source files names for all values in ``echodatas``

Raises
Expand Down Expand Up @@ -147,7 +169,7 @@ def check_and_correct_reversed_time(

Returns
-------
old_time : Optional[xr.DataArray]
old_time : xr.DataArray or None
If correction is necessary, returns the time before
reversal correction, otherwise returns None

Expand Down Expand Up @@ -234,7 +256,7 @@ def orchestrate_reverse_time_check(
combined ``EchoData`` objects
zarr_store: str
The zarr store containing the ``ed_comb`` data
possible_time_dims: List[str]
possible_time_dims: list
All possible time dimensions that can occur within
``ed_comb``, which should be checked
storage_options: dict
Expand Down Expand Up @@ -295,22 +317,30 @@ def orchestrate_reverse_time_check(
def combine_echodata(
echodatas: List[EchoData] = None,
zarr_path: Optional[str] = None,
storage_options: Optional[dict] = {},
overwrite: bool = False,
storage_options: Dict[str, Any] = {},
client: Optional[dask.distributed.Client] = None,
) -> EchoData:
"""
Combines multiple ``EchoData`` objects into a single ``EchoData`` object.
This is accomplished by writing each element of ``echodatas`` in parallel
(using dask) to the zarr store specified by ``zarr_path``.
(using Dask) to the zarr store specified by ``zarr_path``.

Parameters
----------
echodatas : List[EchoData]
echodatas : list
The list of ``EchoData`` objects to be combined
zarr_path: str
zarr_path: str, optional
The full save path to the final combined zarr store
storage_options: Optional[dict]
overwrite: bool
If True, will overwrite the zarr store specified by
``zarr_path`` if it already exists, otherwise an error
will be returned if the file already exists.
storage_options: dict
Any additional parameters for the storage
backend (ignored for local paths)
client: dask.distributed.Client, optional
An initialized Dask distributed client

Returns
-------
Expand All @@ -322,6 +352,8 @@ def combine_echodata(
------
ValueError
If the provided zarr path does not point to a zarr file
RuntimeError
If ``zarr_path`` already exists and ``overwrite=False``
TypeError
If a list of ``EchoData`` objects are not provided
ValueError
Expand Down Expand Up @@ -366,6 +398,9 @@ def combine_echodata(
a variable and the ``Provenance`` attribute ``reversed_ping_times`` will be set to ``1``.
* If no ``zarr_path`` is provided, the combined zarr file will be
``'temp_echopype_output/combined_echodatas.zarr'`` under the current working directory.
* If no ``client`` is provided, then a client with a local scheduler will be used. The
created scheduler and client will be shutdown once computation has finished.
* For each run of this function, we print our the client dashboard link.

Examples
--------
Expand All @@ -387,8 +422,23 @@ def combine_echodata(
"""
# TODO: change PR #297 reference to a link in our documentation

# set flag specifying that a client was not created
client_created = False

# check the client input and print dashboard link
if client is None:
# set flag specifying that a client was created
client_created = True

client = Client() # create client with local scheduler
logger.info(f"Client dashboard link: {client.dashboard_link}")
elif isinstance(client, Client):
logger.info(f"Client dashboard link: {client.dashboard_link}")
else:
raise TypeError(f"The input client is not of type {type(Client)}!")

# Check the provided zarr_path is valid, or create a temp zarr_path if not provided
zarr_path = check_zarr_path(zarr_path, storage_options)
zarr_path = check_zarr_path(zarr_path, storage_options, overwrite)

# return empty EchoData object, if no EchoData objects are provided
if echodatas is None:
Expand All @@ -412,4 +462,8 @@ def combine_echodata(

orchestrate_reverse_time_check(ed_comb, zarr_path, comb.possible_time_dims, storage_options)

if client_created:
# close client
client.close()

return ed_comb
Loading