Skip to content

Commit

Permalink
Quick memory optimisations (#776)
Browse files Browse the repository at this point in the history
* Use itertuples over iterrows since iterrows is an enormous memory hog.

* Drop sources_df columns before renaming id column to avoid a copy of the while dataframe in memory.

* Decrease default partition size to 15MB

* Dont split (large-in-memory) list of DataFrames into dask bags (No performance hit).

* Don't write forced parquets in parallel (No perfomance hit for this).

* Dont overwrite input DataFrame when writing parquets.

* Update CHANGELOG.md

* Address review comments.

* Copy YAML objects before revalidation so the can be garbage collected.

* Appease flake8
  • Loading branch information
mauch authored Nov 4, 2024
1 parent 39d7c99 commit 19883c2
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Changed

- Small memory optimisations: Use `itertuples` in favour of `iterrows`, Loop over mappings rather than converting them to lists up-front. [#776](https://github.com/askap-vast/vast-pipeline/pull/776)
- Updated clearpiperun to delete runs using raw SQL rather than via django [#775](https://github.com/askap-vast/vast-pipeline/pull/775)
- Shortened forced fits measurement names to ensure they fit within the character limits - remove image prefix and limited to 1000 forced fits per source [#734](https://github.com/askap-vast/vast-pipeline/pull/734)
- Cleaned up Code of Conduct including adding Zenodo DOI [#773](https://github.com/askap-vast/vast-pipeline/pull/773)
Expand All @@ -26,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#776](https://github.com/askap-vast/vast-pipeline/pull/776): fix: Minor memory optimisations
- [#775](https://github.com/askap-vast/vast-pipeline/pull/775): fix, feat: Enabled deletion of pipeline runs directly using SQL rather than via django
- [#734](https://github.com/askap-vast/vast-pipeline/pull/734): Shortened forced fits measurement names
- [#773](https://github.com/askap-vast/vast-pipeline/pull/773): docs: Cleaned up Code of Conduct including adding Zenodo DOI
Expand Down
25 changes: 9 additions & 16 deletions vast_pipeline/pipeline/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,12 @@ def validate(self, user: User = None):
# of the user's input format.
# Ensure all input file types have the same epochs.
try:
schema = yaml.Map({epoch: yaml.Seq(yaml.Str()) for epoch in epochs_image})
for input_type in inputs.keys():
self._yaml["inputs"][input_type].revalidate(
yaml.Map({epoch: yaml.Seq(yaml.Str()) for epoch in epochs_image})
)
# Generate a new YAML object on-the-fly per input to avoid saving
# a validation schema per file in the PipelineConfig object
# (These can consume a lot of RAM for long lists of input files).
yaml.load(self._yaml["inputs"][input_type].as_yaml(), schema=schema)
except yaml.YAMLValidationError:
# number of epochs could be different or the name of the epochs may not match
# find out which by counting the number of unique epochs per input type
Expand Down Expand Up @@ -438,20 +440,11 @@ def validate(self, user: User = None):
# This could be combined with the number of epochs validation above, but we want
# to give specific feedback to the user on failure.
try:
schema = yaml.Map(
{epoch: yaml.FixedSeq([yaml.Str()] * epoch_n_files["image"][epoch])
for epoch in epochs_image})
for input_type in inputs.keys():
self._yaml["inputs"][input_type].revalidate(
yaml.Map(
{
epoch: yaml.FixedSeq(
[
yaml.Str()
for _ in range(epoch_n_files["image"][epoch])
]
)
for epoch in epochs_image
}
)
)
yaml.load(self._yaml["inputs"][input_type].as_yaml(), schema=schema)
except yaml.YAMLValidationError:
# map input type to a mapping of epoch to file count
file_counts_str = ""
Expand Down
6 changes: 3 additions & 3 deletions vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ def final_operations(
make_upload_associations(sources_df_upload)

# write associations to parquet file
sources_df.rename(columns={'id': 'meas_id'})[
['source_id', 'meas_id', 'd2d', 'dr']
].to_parquet(os.path.join(p_run.path, 'associations.parquet'))
sources_df[['source_id', 'id', 'd2d', 'dr']]. \
rename(columns={'id': 'meas_id'}). \
to_parquet(os.path.join(p_run.path, 'associations.parquet'))

if calculate_pairs:
# get the Source object primary keys for the measurement pairs
Expand Down
34 changes: 18 additions & 16 deletions vast_pipeline/pipeline/forced_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,15 @@ def image_data_func(image_name: str) -> Dict[str, Any]:
))

# compute the rest of the columns
intermediate_df = (
db.from_sequence(intermediate_df)
.map(lambda x: finalise_forced_dfs(**x))
.compute()
)
# NOTE: Avoid using dask bags to parallelise the mapping
# over DataFrames, since these tend to get very large in memory and
# dask bags make a copy of the output before collecting the results.
# There is also a minimal speed penalty for doing this step without
# parallelism.
intermediate_df = list(map(
lambda x: finalise_forced_dfs(**x),
intermediate_df
))
df_out = (
pd.concat(intermediate_df, axis=0, sort=False)
.rename(
Expand Down Expand Up @@ -457,10 +461,10 @@ def write_group_to_parquet(
pass


def parallel_write_parquet(
def write_forced_parquet(
df: pd.DataFrame, run_path: str, add_mode: bool = False) -> None:
'''
Parallelize writing parquet files for forced measurements.
Write parquet files for forced measurements.
Args:
df:
Expand All @@ -479,15 +483,13 @@ def get_fname(n): return os.path.join(
run_path,
'forced_measurements_' + n.replace('.', '_') + '.parquet'
)
dfs = list(map(lambda x: (df[df['image'] == x], get_fname(x)), images))
n_cpu = cpu_count() - 1

# writing parquets using Dask bag
bags = db.from_sequence(dfs)
bags = bags.starmap(
lambda df, fname: write_group_to_parquet(df, fname, add_mode))
bags.compute(num_workers=n_cpu)
# Avoid saving the maping to a list since this copies the the entire
# DataFrame which can already be very large in memory at this point.
dfs = map(lambda x: (df[df['image'] == x], get_fname(x)), images)

# Write parquets
for this_df, fname in dfs:
write_group_to_parquet(this_df, fname, add_mode)
pass


Expand Down Expand Up @@ -689,7 +691,7 @@ def forced_extraction(
logger.info(
'Saving forced measurements to specific parquet file...'
)
parallel_write_parquet(extr_df, p_run.path, add_mode)
write_forced_parquet(extr_df, p_run.path, add_mode)

# Required to rename this column for the image add mode.
extr_df = extr_df.rename(columns={'time': 'datetime'})
Expand Down
30 changes: 15 additions & 15 deletions vast_pipeline/pipeline/model_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def measurement_models_generator(
An iterable generator object containing the yielded Measurement
objects.
"""
for i, row in meas_df.iterrows():
for row in meas_df.itertuples():
one_m = Measurement()
for fld in one_m._meta.get_fields():
if getattr(fld, 'attname', None) and fld.attname in row.index:
setattr(one_m, fld.attname, row[fld.attname])
if getattr(fld, 'attname', None) and hasattr(row, fld.attname):
setattr(one_m, fld.attname, getattr(row, fld.attname))
yield one_m


Expand All @@ -55,19 +55,19 @@ def source_models_generator(
Returns:
An iterable generator object containing the yielded Source objects.
"""
for i, row in src_df.iterrows():
for row in src_df.itertuples():
# generate an IAU compliant source name, see
# https://cdsweb.u-strasbg.fr/Dic/iau-spec.html
name = (
f"J{deg2hms(row['wavg_ra'], precision=1, truncate=True)}"
f"{deg2dms(row['wavg_dec'], precision=0, truncate=True)}"
f"J{deg2hms(row.wavg_ra, precision=1, truncate=True)}"
f"{deg2dms(row.wavg_dec, precision=0, truncate=True)}"
).replace(":", "")
src = Source()
src.run_id = pipeline_run.id
src.name = name
for fld in src._meta.get_fields():
if getattr(fld, 'attname', None) and fld.attname in row.index:
setattr(src, fld.attname, row[fld.attname])
if getattr(fld, 'attname', None) and hasattr(row, fld.attname):
setattr(src, fld.attname, getattr(row, fld.attname))

yield src

Expand All @@ -88,12 +88,12 @@ def association_models_generator(
An iterable generator object containing the yielded Association objects.
"""
logger.debug(f"Building {len(assoc_df)} association generators")
for i, row in assoc_df.iterrows():
for row in assoc_df.itertuples():
yield Association(
meas_id=row['id'],
source_id=row['source_id'],
d2d=row['d2d'],
dr=row['dr'],
meas_id=row.id,
source_id=row.source_id,
d2d=row.d2d,
dr=row.dr,
)
logger.debug(f"Built {len(assoc_df)} association generators")

Expand All @@ -113,5 +113,5 @@ def related_models_generator(
Returns:
An iterable generator object containing the yielded Association objects.
"""
for i, row in related_df.iterrows():
yield RelatedSource(**row.to_dict())
for row in related_df.itertuples(index=False):
yield RelatedSource(**row._asdict())
10 changes: 9 additions & 1 deletion vast_pipeline/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def timeStamped(fname, fmt="%Y-%m-%d-%H-%M-%S_{fname}"):
return datetime.now().strftime(fmt).format(fname=fname)


def calculate_n_partitions(df, n_cpu, partition_size_mb=100):
def calculate_n_partitions(df, n_cpu, partition_size_mb=15):
"""
This function will calculate how many partitions a dataframe should be
split into.
Expand All @@ -401,6 +401,14 @@ def calculate_n_partitions(df, n_cpu, partition_size_mb=100):
df: The pandas dataframe to be partitionined.
n_cpu: The number of available CPUs.
partition_size: The optimal partition size in MB.
NOTE: The default partition size of 15MB is chosen because
many of the parallelised operations on partitioned
DataFrames can consume a much larger amount of memory
than the size of the partition. 15MB avoids consuming
too much memory for significant amounts of parallelism
(e.g. n_cpu > 10) without significant cost to processing
speed.
Returns:
The optimal number of partitions.
"""
Expand Down

0 comments on commit 19883c2

Please sign in to comment.