Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly calculate new_high_sigma #714

Merged
merged 12 commits into from
Jan 29, 2025
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased](https://github.com/askap-vast/vast-pipeline/compare/v1.1.0...HEAD)

#### Added

- Added option to disable forced photometry clustering [#788](https://github.com/askap-vast/vast-pipeline/pull/788)
- Added additional logging to forced extraction step [#788](https://github.com/askap-vast/vast-pipeline/pull/788)

#### Changed

- V2: Reorganise new_high_sigma calculation into dedicated function [#714](https://github.com/askap-vast/vast-pipeline/pull/714)
- V2: Update dependencies to fix dependabot warnings and prepare for Dask distributed implementation.
- Upgrade measurements file generation to use dask in order to handle larger runs [#789](https://github.com/askap-vast/vast-pipeline/pull/797)
- Replaced optimise_ints and optimise_floats with single function - optimise_numeric [#789](https://github.com/askap-vast/vast-pipeline/pull/797)
- Upgrade measurements file generation to use dask in order to handle larger runs [#797](https://github.com/askap-vast/vast-pipeline/pull/797)
- Replaced optimise_ints and optimise_floats with single function - optimise_numeric [#797](https://github.com/askap-vast/vast-pipeline/pull/797)
- Upgrade forced_phot dependency to v0.2 and force `use_numba=True` [#788](https://github.com/askap-vast/vast-pipeline/pull/788)
- Remove bad forced photometry fits immediately after calculation, rather than after they've all been compiled into a single dataframe [#788](https://github.com/askap-vast/vast-pipeline/pull/788)
- Optimise associations upload by dropping unnecessary columns prior to a large dataframe merge [#787](https://github.com/askap-vast/vast-pipeline/pull/787)

#### Fixed

- V2: Fix incorrect calculation of the new_high_sigma parameter [#714](https://github.com/askap-vast/vast-pipeline/pull/714)
- Fixed outdated jupyterhub link on pipeline website [#795](https://github.com/askap-vast/vast-pipeline/pull/795)
- Renamed variable in pipeline.finalise to better reflect what the dataframe represents (sources_df -> associations_df) [#787](https://github.com/askap-vast/vast-pipeline/pull/787)
- Fixed typo in variable name ("assoications") [#787](https://github.com/askap-vast/vast-pipeline/pull/787)
Expand All @@ -32,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#714](https://github.com/askap-vast/vast-pipeline/pull/714): fix: V2: Correctly calculate new_high_sigma parameter
- [#798](https://github.com/askap-vast/vast-pipeline/pull/798): dep: V2: Update dependencies to deal with dependabot reccomendations and to prepare for V2 upgrades.
- [#789](https://github.com/askap-vast/vast-pipeline/pull/797): feat: Convert measurements.arrow generation to use dask, and combine optimise_ints/floats to optimise_numeric
- [#795](https://github.com/askap-vast/vast-pipeline/pull/795): fix: Fixed outdated jupyterhub link on pipeline website
Expand Down
93 changes: 36 additions & 57 deletions vast_pipeline/pipeline/new_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ def get_image_rms_measurements(
return group


def parallel_get_rms_measurements(
def parallel_get_new_high_sigma(
df: pd.DataFrame, edge_buffer: float = 1.0,
n_cpu: int = 0, max_partition_mb: int = 15
) -> pd.DataFrame:
"""
Wrapper function to use 'get_image_rms_measurements'
in parallel with Dask. nbeam is not an option here as that parameter
is fixed in forced extraction and so is made sure to be fixed here to. This
may change in the future.
Wrapper function to use 'get_image_rms_measurements' in parallel with Dask
and calculate the new high sigma. nbeam is not an option here as that
parameter is fixed in forced extraction and so is made sure to be fixed
here too. This may change in the future.

Args:
df:
Expand Down Expand Up @@ -252,30 +252,48 @@ def parallel_get_rms_measurements(
).compute(num_workers=n_workers, scheduler='processes')
)

# We don't need all of the RMS measurements, just the lowest. Keeping all
# of them results in huge memory usage when merging. However, there is an
# existing bug: https://github.com/askap-vast/vast-pipeline/issues/713
# that means that we actually want the _highest_ in order to reproduce the
# current behaviour. Fixing the bug is beyond the scope of this PR because
# it means rewriting tests and test data.

df_to_merge = (df.drop_duplicates('source')
.drop(['img_diff_rms_path'], axis=1)
)

out_to_merge = (out.sort_values(
by=['source', 'img_diff_true_rms'], ascending=False
by=['source', 'img_diff_true_rms'], ascending=True
)
.drop_duplicates('source')
)

df = df_to_merge.merge(
new_sources_df = df_to_merge.merge(
out_to_merge[['source', 'img_diff_true_rms']],
left_on='source', right_on='source',
how='left'
)

# this removes those that are out of range
new_sources_df['img_diff_true_rms'] = (
new_sources_df['img_diff_true_rms'].fillna(0.)
)
new_sources_df = new_sources_df[
new_sources_df['img_diff_true_rms'] > 0
]

return df
# calculate the true sigma
new_sources_df['true_sigma'] = (
new_sources_df['flux_peak'].values
/ new_sources_df['img_diff_true_rms'].values
)

# keep only the highest for each source, rename for the daatabase
new_sources_df = (
new_sources_df
.set_index('source')
.rename(columns={'true_sigma': 'new_high_sigma'})
)

# moving forward only the new_high_sigma columns is needed, drop all
# others.
new_sources_df = new_sources_df[['new_high_sigma']]

return new_sources_df


def new_sources(
Expand Down Expand Up @@ -469,54 +487,15 @@ def new_sources(
# to measure the true rms at the source location.

# measure the actual rms in the previous images at
# the source location.

# PR#713: This part of the code should be rewritten to reflect the new
# behaviour of parallel_get_rms_measurements. That function should be
# renamed to something like parallel_get_new_high_sigma and all of the
# subsequent code in this function moved into it.
# the source location and calculate the corresponding S/N

logger.debug("Getting rms measurements...")

new_sources_df = parallel_get_rms_measurements(
logger.debug("Getting new_high_sigma measurements...")
new_sources_df = parallel_get_new_high_sigma(
new_sources_df, edge_buffer=edge_buffer,
n_cpu=n_cpu, max_partition_mb=max_partition_mb
)
logger.debug(f"Time to get rms measurements: {debug_timer.reset()}s")

# this removes those that are out of range
new_sources_df['img_diff_true_rms'] = (
new_sources_df['img_diff_true_rms'].fillna(0.)
)
new_sources_df = new_sources_df[
new_sources_df['img_diff_true_rms'] != 0
]

# calculate the true sigma
new_sources_df['true_sigma'] = (
new_sources_df['flux_peak'].values
/ new_sources_df['img_diff_true_rms'].values
)

# We only care about the highest true sigma
# new_sources_df = new_sources_df.sort_values(
# by=['source', 'true_sigma']
# )

# keep only the highest for each source, rename for the daatabase
new_sources_df = (
new_sources_df
# .drop_duplicates('source')
.set_index('source')
.rename(columns={'true_sigma': 'new_high_sigma'})
)

# moving forward only the new_high_sigma columns is needed, drop all
# others.
new_sources_df = new_sources_df[['new_high_sigma']]

logger.debug(f"Time to to do final cleanup steps {debug_timer.reset()}s")

logger.info(
'Total new source analysis time: %.2f seconds', timer.reset_init()
)
Expand Down
6 changes: 3 additions & 3 deletions vast_pipeline/tests/test_regression/test_epoch.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_known_source(self):
'''
See documentation for test_known_source in property check.
'''
property_check.test_known_source(self, self.sources, 12.369)
property_check.test_known_source(self, self.sources, 13.943)


@unittest.skipIf(
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_known_source(self):
'''
See documentation for test_known_source in property_check.
'''
property_check.test_known_source(self, self.sources, 12.369)
property_check.test_known_source(self, self.sources, 13.943)


@unittest.skipIf(
Expand Down Expand Up @@ -248,4 +248,4 @@ def test_known_source(self):
'''
See documentation for test_known_source in property_check.
'''
property_check.test_known_source(self, self.sources, 12.369)
property_check.test_known_source(self, self.sources, 13.943)
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def test_known_source(self):
'''
See documentation for test_known_source in property_check.
'''
property_check.test_known_source(self, self.sources, 12.369)
property_check.test_known_source(self, self.sources, 13.943)


@unittest.skipIf(
Expand Down
6 changes: 3 additions & 3 deletions vast_pipeline/tests/test_regression/test_normal.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_known_source(self):
'''
See documentation for test_known_source in property_check.
'''
property_check.test_known_source(self, self.sources, 12.369)
property_check.test_known_source(self, self.sources, 13.943)


@unittest.skipIf(
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_known_source(self):
'''
See documentation for test_known_source in property_check.
'''
property_check.test_known_source(self, self.sources, 12.369)
property_check.test_known_source(self, self.sources, 13.943)


@unittest.skipIf(
Expand Down Expand Up @@ -256,4 +256,4 @@ def test_known_source(self):
'''
See documentation for test_known_source in property_check.
'''
property_check.test_known_source(self, self.sources, 12.369)
property_check.test_known_source(self, self.sources, 13.943)