diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cb2657c..3620f419 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### Changed +- Small memory optimisations: Use `itertuples` in favour of `iterrows`, Loop over mappings rather than converting them to lists up-front. [#776](https://github.com/askap-vast/vast-pipeline/pull/776) - Updated clearpiperun to delete runs using raw SQL rather than via django [#775](https://github.com/askap-vast/vast-pipeline/pull/775) - Shortened forced fits measurement names to ensure they fit within the character limits - remove image prefix and limited to 1000 forced fits per source [#734](https://github.com/askap-vast/vast-pipeline/pull/734) - Cleaned up Code of Conduct including adding Zenodo DOI [#773](https://github.com/askap-vast/vast-pipeline/pull/773) @@ -26,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### List of PRs +- [#776](https://github.com/askap-vast/vast-pipeline/pull/776): fix: Minor memory optimisations - [#775](https://github.com/askap-vast/vast-pipeline/pull/775): fix, feat: Enabled deletion of pipeline runs directly using SQL rather than via django - [#734](https://github.com/askap-vast/vast-pipeline/pull/734): Shortened forced fits measurement names - [#773](https://github.com/askap-vast/vast-pipeline/pull/773): docs: Cleaned up Code of Conduct including adding Zenodo DOI diff --git a/vast_pipeline/pipeline/config.py b/vast_pipeline/pipeline/config.py index 733267d5..d81a7374 100644 --- a/vast_pipeline/pipeline/config.py +++ b/vast_pipeline/pipeline/config.py @@ -402,10 +402,12 @@ def validate(self, user: User = None): # of the user's input format. # Ensure all input file types have the same epochs. try: + schema = yaml.Map({epoch: yaml.Seq(yaml.Str()) for epoch in epochs_image}) for input_type in inputs.keys(): - self._yaml["inputs"][input_type].revalidate( - yaml.Map({epoch: yaml.Seq(yaml.Str()) for epoch in epochs_image}) - ) + # Generate a new YAML object on-the-fly per input to avoid saving + # a validation schema per file in the PipelineConfig object + # (These can consume a lot of RAM for long lists of input files). + yaml.load(self._yaml["inputs"][input_type].as_yaml(), schema=schema) except yaml.YAMLValidationError: # number of epochs could be different or the name of the epochs may not match # find out which by counting the number of unique epochs per input type @@ -438,20 +440,11 @@ def validate(self, user: User = None): # This could be combined with the number of epochs validation above, but we want # to give specific feedback to the user on failure. try: + schema = yaml.Map( + {epoch: yaml.FixedSeq([yaml.Str()] * epoch_n_files["image"][epoch]) + for epoch in epochs_image}) for input_type in inputs.keys(): - self._yaml["inputs"][input_type].revalidate( - yaml.Map( - { - epoch: yaml.FixedSeq( - [ - yaml.Str() - for _ in range(epoch_n_files["image"][epoch]) - ] - ) - for epoch in epochs_image - } - ) - ) + yaml.load(self._yaml["inputs"][input_type].as_yaml(), schema=schema) except yaml.YAMLValidationError: # map input type to a mapping of epoch to file count file_counts_str = "" diff --git a/vast_pipeline/pipeline/finalise.py b/vast_pipeline/pipeline/finalise.py index 54f928fc..2da077f3 100644 --- a/vast_pipeline/pipeline/finalise.py +++ b/vast_pipeline/pipeline/finalise.py @@ -357,9 +357,9 @@ def final_operations( make_upload_associations(sources_df_upload) # write associations to parquet file - sources_df.rename(columns={'id': 'meas_id'})[ - ['source_id', 'meas_id', 'd2d', 'dr'] - ].to_parquet(os.path.join(p_run.path, 'associations.parquet')) + sources_df[['source_id', 'id', 'd2d', 'dr']]. \ + rename(columns={'id': 'meas_id'}). \ + to_parquet(os.path.join(p_run.path, 'associations.parquet')) if calculate_pairs: # get the Source object primary keys for the measurement pairs diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 6e5faed4..56e7aec4 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -413,11 +413,15 @@ def image_data_func(image_name: str) -> Dict[str, Any]: )) # compute the rest of the columns - intermediate_df = ( - db.from_sequence(intermediate_df) - .map(lambda x: finalise_forced_dfs(**x)) - .compute() - ) + # NOTE: Avoid using dask bags to parallelise the mapping + # over DataFrames, since these tend to get very large in memory and + # dask bags make a copy of the output before collecting the results. + # There is also a minimal speed penalty for doing this step without + # parallelism. + intermediate_df = list(map( + lambda x: finalise_forced_dfs(**x), + intermediate_df + )) df_out = ( pd.concat(intermediate_df, axis=0, sort=False) .rename( @@ -457,10 +461,10 @@ def write_group_to_parquet( pass -def parallel_write_parquet( +def write_forced_parquet( df: pd.DataFrame, run_path: str, add_mode: bool = False) -> None: ''' - Parallelize writing parquet files for forced measurements. + Write parquet files for forced measurements. Args: df: @@ -479,15 +483,13 @@ def get_fname(n): return os.path.join( run_path, 'forced_measurements_' + n.replace('.', '_') + '.parquet' ) - dfs = list(map(lambda x: (df[df['image'] == x], get_fname(x)), images)) - n_cpu = cpu_count() - 1 - - # writing parquets using Dask bag - bags = db.from_sequence(dfs) - bags = bags.starmap( - lambda df, fname: write_group_to_parquet(df, fname, add_mode)) - bags.compute(num_workers=n_cpu) + # Avoid saving the maping to a list since this copies the the entire + # DataFrame which can already be very large in memory at this point. + dfs = map(lambda x: (df[df['image'] == x], get_fname(x)), images) + # Write parquets + for this_df, fname in dfs: + write_group_to_parquet(this_df, fname, add_mode) pass @@ -689,7 +691,7 @@ def forced_extraction( logger.info( 'Saving forced measurements to specific parquet file...' ) - parallel_write_parquet(extr_df, p_run.path, add_mode) + write_forced_parquet(extr_df, p_run.path, add_mode) # Required to rename this column for the image add mode. extr_df = extr_df.rename(columns={'time': 'datetime'}) diff --git a/vast_pipeline/pipeline/model_generator.py b/vast_pipeline/pipeline/model_generator.py index c81da32b..c79e5080 100644 --- a/vast_pipeline/pipeline/model_generator.py +++ b/vast_pipeline/pipeline/model_generator.py @@ -30,11 +30,11 @@ def measurement_models_generator( An iterable generator object containing the yielded Measurement objects. """ - for i, row in meas_df.iterrows(): + for row in meas_df.itertuples(): one_m = Measurement() for fld in one_m._meta.get_fields(): - if getattr(fld, 'attname', None) and fld.attname in row.index: - setattr(one_m, fld.attname, row[fld.attname]) + if getattr(fld, 'attname', None) and hasattr(row, fld.attname): + setattr(one_m, fld.attname, getattr(row, fld.attname)) yield one_m @@ -55,19 +55,19 @@ def source_models_generator( Returns: An iterable generator object containing the yielded Source objects. """ - for i, row in src_df.iterrows(): + for row in src_df.itertuples(): # generate an IAU compliant source name, see # https://cdsweb.u-strasbg.fr/Dic/iau-spec.html name = ( - f"J{deg2hms(row['wavg_ra'], precision=1, truncate=True)}" - f"{deg2dms(row['wavg_dec'], precision=0, truncate=True)}" + f"J{deg2hms(row.wavg_ra, precision=1, truncate=True)}" + f"{deg2dms(row.wavg_dec, precision=0, truncate=True)}" ).replace(":", "") src = Source() src.run_id = pipeline_run.id src.name = name for fld in src._meta.get_fields(): - if getattr(fld, 'attname', None) and fld.attname in row.index: - setattr(src, fld.attname, row[fld.attname]) + if getattr(fld, 'attname', None) and hasattr(row, fld.attname): + setattr(src, fld.attname, getattr(row, fld.attname)) yield src @@ -88,12 +88,12 @@ def association_models_generator( An iterable generator object containing the yielded Association objects. """ logger.debug(f"Building {len(assoc_df)} association generators") - for i, row in assoc_df.iterrows(): + for row in assoc_df.itertuples(): yield Association( - meas_id=row['id'], - source_id=row['source_id'], - d2d=row['d2d'], - dr=row['dr'], + meas_id=row.id, + source_id=row.source_id, + d2d=row.d2d, + dr=row.dr, ) logger.debug(f"Built {len(assoc_df)} association generators") @@ -113,5 +113,5 @@ def related_models_generator( Returns: An iterable generator object containing the yielded Association objects. """ - for i, row in related_df.iterrows(): - yield RelatedSource(**row.to_dict()) + for row in related_df.itertuples(index=False): + yield RelatedSource(**row._asdict()) diff --git a/vast_pipeline/utils/utils.py b/vast_pipeline/utils/utils.py index f8a3b738..4719191b 100644 --- a/vast_pipeline/utils/utils.py +++ b/vast_pipeline/utils/utils.py @@ -392,7 +392,7 @@ def timeStamped(fname, fmt="%Y-%m-%d-%H-%M-%S_{fname}"): return datetime.now().strftime(fmt).format(fname=fname) -def calculate_n_partitions(df, n_cpu, partition_size_mb=100): +def calculate_n_partitions(df, n_cpu, partition_size_mb=15): """ This function will calculate how many partitions a dataframe should be split into. @@ -401,6 +401,14 @@ def calculate_n_partitions(df, n_cpu, partition_size_mb=100): df: The pandas dataframe to be partitionined. n_cpu: The number of available CPUs. partition_size: The optimal partition size in MB. + NOTE: The default partition size of 15MB is chosen because + many of the parallelised operations on partitioned + DataFrames can consume a much larger amount of memory + than the size of the partition. 15MB avoids consuming + too much memory for significant amounts of parallelism + (e.g. n_cpu > 10) without significant cost to processing + speed. + Returns: The optimal number of partitions. """