Skip to content

Commit

Permalink
Added memory usage logging to finalise.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ddobie committed Jul 3, 2024
1 parent bfd581d commit 656ff1b
Showing 1 changed file with 47 additions and 2 deletions.
49 changes: 47 additions & 2 deletions vast_pipeline/pipeline/finalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
update_sources
)
from vast_pipeline.pipeline.pairs import calculate_measurement_pair_metrics
from vast_pipeline.pipeline.utils import parallel_groupby
from vast_pipeline.pipeline.utils import parallel_groupby, get_df_memory_usage, log_total_memory_usage


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -132,8 +132,14 @@ def final_operations(
'Calculating statistics for %i sources...',
sources_df.source.unique().shape[0]
)
log_total_memory_usage()

srcs_df = parallel_groupby(sources_df)

mem_usage = get_df_memory_usage(srcs_df)
logger.info('Groupby-apply time: %.2f seconds', timer.reset())
logger.debug(f"Initial srcs_df memory: {mem_usage}MB")
log_total_memory_usage()

# add new sources
srcs_df["new"] = srcs_df.index.isin(new_sources_df.index)
Expand All @@ -145,6 +151,10 @@ def final_operations(
how="left",
)
srcs_df["new_high_sigma"] = srcs_df["new_high_sigma"].fillna(0.0)

mem_usage = get_df_memory_usage(srcs_df)
logger.debug(f"srcs_df memory after adding new sources: {mem_usage}MB")
log_total_memory_usage()

# calculate nearest neighbour
srcs_skycoord = SkyCoord(
Expand All @@ -159,12 +169,19 @@ def final_operations(

# add the separation distance in degrees
srcs_df['n_neighbour_dist'] = d2d.deg

mem_usage = get_df_memory_usage(srcs_df)
logger.debug(f"srcs_df memory after nearest-neighbour: {mem_usage}MB")
log_total_memory_usage()

# create measurement pairs, aka 2-epoch metrics
if calculate_pairs:
timer.reset()
measurement_pairs_df = calculate_measurement_pair_metrics(sources_df)
logger.info('Measurement pair metrics time: %.2f seconds', timer.reset())
mem_usage = get_df_memory_usage(measurement_pairs_df)
logger.debug(f"measurment_pairs_df memory: {mem_usage}MB")
log_total_memory_usage()

# calculate measurement pair metric aggregates for sources by finding the row indices
# of the aggregate max of the abs(m) metric for each flux type.
Expand All @@ -189,6 +206,9 @@ def final_operations(
"m_abs_significant_max_int": 0.0,
})
logger.info("Measurement pair aggregate metrics time: %.2f seconds", timer.reset())
mem_usage = get_df_memory_usage(srcs_df)
logger.debug(f"srcs_df memory after calculate_pairs: {mem_usage}MB")
log_total_memory_usage()
else:
logger.info(
"Skipping measurement pair metric calculation as specified in the run configuration."
Expand All @@ -201,18 +221,39 @@ def final_operations(
# upload new ones first (new id's are fetched)
src_done_mask = srcs_df.index.isin(done_source_ids)
srcs_df_upload = srcs_df.loc[~src_done_mask].copy()

mem_usage = get_df_memory_usage(srcs_df_upload)
logger.debug(f"srcs_df_upload initial memory: {mem_usage}MB")
get_total_memory_usage()

srcs_df_upload = make_upload_sources(srcs_df_upload, p_run, add_mode)

mem_usage = get_df_memory_usage(srcs_df_upload)
logger.debug(f"srcs_df_upload memory after upload: {mem_usage}MB")
get_total_memory_usage()

# And now update
srcs_df_update = srcs_df.loc[src_done_mask].copy()
logger.info(
f"Updating {srcs_df_update.shape[0]} sources with new metrics.")
mem_usage = get_df_memory_usage(srcs_df_update)
logger.debug(f"srcs_df_update memory: {mem_usage}MB")
get_total_memory_usage()

srcs_df = update_sources(srcs_df_update, batch_size=1000)
mem_usage = get_df_memory_usage(srcs_df_update)
logger.debug(f"srcs_df_update memory: {mem_usage}MB")
get_total_memory_usage()
# Add back together
if not srcs_df_upload.empty:
srcs_df = pd.concat([srcs_df, srcs_df_upload])
else:
srcs_df = make_upload_sources(srcs_df, p_run, add_mode)

mem_usage = get_df_memory_usage(srcs_df)
logger.debug(f"srcs_df memory after uploading sources: {mem_usage}MB")
get_total_memory_usage()

# gather the related df, upload to db and save to parquet file
# the df will look like
#
Expand Down Expand Up @@ -267,11 +308,15 @@ def final_operations(
.to_parquet(os.path.join(p_run.path, 'sources.parquet'))
)

# update measurments with sources to get associations
# update measurements with sources to get associations
sources_df = (
sources_df.drop('related', axis=1)
.merge(srcs_df.rename(columns={'id': 'source_id'}), on='source')
)

mem_usage = get_df_memory_usage(sources_df)
logger.debug(f"sources_df memory after srcs_df merge: {mem_usage}MB")
get_total_memory_usage()

if add_mode:
# Load old associations so the already uploaded ones can be removed
Expand Down

0 comments on commit 656ff1b

Please sign in to comment.