Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sensible partitioning #724

Merged
merged 14 commits into from
Jul 12, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Added

- Added support calculate_n_partitions for sensible dask dataframe partitioning [#724](https://github.com/askap-vast/vast-pipeline/pull/724)
- Added support for compressed FITS files [#694](https://github.com/askap-vast/vast-pipeline/pull/694)
- Added links to Data Central DAS and the Fink Broker to the source page [#697](https://github.com/askap-vast/vast-pipeline/pull/697/)
- Added `n_new_sources` column to run model to store the number of new sources in a pipeline run [#676](https://github.com/askap-vast/vast-pipeline/pull/676).
Expand Down Expand Up @@ -76,6 +77,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Fixed

- Implemented sensible dask dataframe partitioning [#724](https://github.com/askap-vast/vast-pipeline/pull/724)
- Fixed outdated `runpipeline` section on CLI docs page [#685](https://github.com/askap-vast/vast-pipeline/pull/685).
- Fixed link to JupyterHub [#676](https://github.com/askap-vast/vast-pipeline/pull/676).
- Ensure Image models are not created if the catalogue ingest fails [#648](https://github.com/askap-vast/vast-pipeline/pull/648).
Expand Down Expand Up @@ -114,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#724](https://github.com/askap-vast/vast-pipeline/pull/724): fix, feat: Implemented sensible dask dataframe partitioning
- [#694](https://github.com/askap-vast/vast-pipeline/pull/694): feat: Handle compressed fits files.
- [#702](https://github.com/askap-vast/vast-pipeline/pull/702): fix: Downgrade ci-docs to python 3.8.
- [#701](https://github.com/askap-vast/vast-pipeline/pull/701): fix: Update Gr1N poetry to v8, force python 3.8.10.
Expand Down
8 changes: 5 additions & 3 deletions vast_pipeline/pipeline/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
reconstruct_associtaion_dfs
)
from vast_pipeline.pipeline.config import PipelineConfig
from vast_pipeline.utils.utils import StopWatch
from vast_pipeline.utils.utils import StopWatch, calculate_n_partitions


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1588,10 +1588,12 @@ def parallel_association(
id_incr_par_assoc = max(done_source_ids) if add_mode else 0

n_cpu = cpu_count() - 1

logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(images_df, n_cpu)

# pass each skyreg_group through the normal association process.
results = (
dd.from_pandas(images_df, n_cpu)
dd.from_pandas(images_df.set_index('skyreg_group'), npartitions=n_partitions)
.groupby('skyreg_group')
.apply(
association,
Expand Down
6 changes: 4 additions & 2 deletions vast_pipeline/pipeline/new_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)

from vast_pipeline.models import Image, Run
from vast_pipeline.utils.utils import StopWatch
from vast_pipeline.utils.utils import StopWatch, calculate_n_partitions
from vast_pipeline.image.utils import open_fits


Expand Down Expand Up @@ -219,9 +219,11 @@ def parallel_get_rms_measurements(
}

n_cpu = cpu_count() - 1
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(out, n_cpu)

out = (
dd.from_pandas(out, n_cpu)
dd.from_pandas(out, npartitions=n_partitions)
.groupby('img_diff_rms_path')
.apply(
get_image_rms_measurements,
Expand Down
6 changes: 5 additions & 1 deletion vast_pipeline/pipeline/pairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import pandas as pd
from psutil import cpu_count

from vast_pipeline.utils.utils import calculate_n_partitions


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,6 +67,8 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame:
m_peak, m_int - variability modulation index
"""
n_cpu = cpu_count() - 1
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(df.set_index('source'), n_cpu)

"""Create a DataFrame containing all measurement ID combinations per source.
Resultant DataFrame will have a MultiIndex(["source", RangeIndex]) where "source" is
Expand All @@ -86,7 +90,7 @@ def calculate_measurement_pair_metrics(df: pd.DataFrame) -> pd.DataFrame:
11128 0 6216 23534
"""
measurement_combinations = (
dd.from_pandas(df, n_cpu)
dd.from_pandas(df, npartitions=n_partitions)
.groupby("source")["id"]
.apply(
lambda x: pd.DataFrame(list(combinations(x, 2))), meta={0: "i", 1: "i"},)
Expand Down
13 changes: 10 additions & 3 deletions vast_pipeline/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from vast_pipeline.image.main import FitsImage, SelavyImage
from vast_pipeline.image.utils import open_fits
from vast_pipeline.utils.utils import (
eq_to_cart, StopWatch, optimize_ints, optimize_floats
eq_to_cart, StopWatch, optimize_ints, optimize_floats,
calculate_n_partitions
)
from vast_pipeline.models import (
Band, Image, Run, SkyRegion
Expand Down Expand Up @@ -704,7 +705,10 @@ def parallel_groupby(df: pd.DataFrame) -> pd.DataFrame:
'related_list': 'O'
}
n_cpu = cpu_count() - 1
out = dd.from_pandas(df, n_cpu)
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(df, n_cpu)

out = dd.from_pandas(df.set_index('source'), npartitions=n_partitions)
out = (
out.groupby('source')
.apply(
Expand Down Expand Up @@ -763,7 +767,10 @@ def parallel_groupby_coord(df: pd.DataFrame) -> pd.DataFrame:
'wavg_dec': 'f',
}
n_cpu = cpu_count() - 1
out = dd.from_pandas(df, n_cpu)
logger.debug(f"Running association with {n_cpu} CPUs")
n_partitions = calculate_n_partitions(df, n_cpu)

out = dd.from_pandas(df.set_index('source'), npartitions=n_partitions)
out = (
out.groupby('source')
.apply(calc_ave_coord, meta=col_dtype)
Expand Down
23 changes: 23 additions & 0 deletions vast_pipeline/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,26 @@ def dict_merge(

def timeStamped(fname, fmt="%Y-%m-%d-%H-%M-%S_{fname}"):
return datetime.now().strftime(fmt).format(fname=fname)

def calculate_n_partitions(df, n_cpu, partition_size_mb=100):
"""
This function will calculate how many partitions a dataframe should be
split into.

Args:
df: The pandas dataframe to be partitionined.
n_cpu: The number of available CPUs.
partition_size: The optimal partition size in MB.
Returns:
The optimal number of partitions.
"""
mem_usage_mb = df.memory_usage(deep=True).sum() / 1e6
n_partitions = int(np.ceil(mem_usage_mb/partition_size_mb))

# n_partitions should be >= n_cpu for optimal parallel processing
if n_partitions < n_cpu:
n_partitions=n_cpu

logger.debug("Using {n_partitions} partions of {partition_size}MB")

return n_partitions
Loading