diff --git a/setup.cfg b/setup.cfg index 3daaf7df..8513dbc5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,6 +18,7 @@ install_requires = numpy>=1.8.0 radio_beam>=0.3.3 dask[array] + dask[distributed] joblib casa-formats-io packaging diff --git a/spectral_cube/cube_utils.py b/spectral_cube/cube_utils.py index 07f01fd2..5a25bb5c 100644 --- a/spectral_cube/cube_utils.py +++ b/spectral_cube/cube_utils.py @@ -1,16 +1,21 @@ import contextlib import warnings +import tempfile +import os +import time from copy import deepcopy import builtins import dask.array as da +from dask.distributed import Client import numpy as np from astropy.wcs.utils import proj_plane_pixel_area from astropy.wcs import (WCSSUB_SPECTRAL, WCSSUB_LONGITUDE, WCSSUB_LATITUDE) from astropy.wcs import WCS from . import wcs_utils from .utils import FITSWarning, AstropyUserWarning, WCSCelestialError +from .masks import BooleanArrayMask from astropy import log from astropy.io import fits from astropy.wcs.utils import is_proj_plane_distorted @@ -19,7 +24,10 @@ import itertools import re from radio_beam import Beam +from radio_beam.utils import BeamError +from multiprocessing import Process, Pool +from functools import partial def _fix_spectral(wcs): """ @@ -748,7 +756,9 @@ def bunit_converters(obj, unit, equivalencies=(), freq=None): # Slice along first axis to return a 1D array. return factors[0] -def combine_headers(header1, header2, **kwargs): +def combine_headers(header1, header2, spectral_dx_threshold=0, + projection=None, + **kwargs): ''' Given two Header objects, this function returns a fits Header of the optimal wcs. @@ -758,6 +768,12 @@ def combine_headers(header1, header2, **kwargs): A Header. header2 : astropy.io.fits.Header A Header. + spectral_dx_threshold : number + The maximum fractional difference in spectral pixel size. + If the difference exceeds this value, an exception will be raised. + projection : str or None + The projection code to use. If None, the projection code will be the same as + `header1`. Returns ------- @@ -766,6 +782,9 @@ def combine_headers(header1, header2, **kwargs): ''' + # No repeated projection kwarg. + kwargs.pop('projection', None) + from reproject.mosaicking import find_optimal_celestial_wcs # Get wcs and shape of both headers @@ -774,24 +793,145 @@ def combine_headers(header1, header2, **kwargs): w2 = WCS(header2).celestial s2 = w2.array_shape + if projection is None: + projection = w1.wcs.ctype[0].split("-")[-1] + assert len(projection) == 3, "Projection code must be 3 characters long." + + proj_codes = [ + "AZP", + "SZP", + "TAN", + "STG", + "SIN", + "ARC", + "ZEA", + "AIR", + "CYP", + "CEA", + "CAR", + "MER", + "SFL", + "PAR", + "MOL", + "AIT", + "COP", + "COE", + "COD", + "COO", + "BON", + "PCO", + "TSC", + "CSC", + "QSC", + "HPX", + "XPH", + ] + if type(projection) == str: + if projection not in proj_codes: + raise ValueError( + "Must specify valid projection code from list of supported types: ", + ", ".join(proj_codes), + ) + # Get the optimal wcs and shape for both fields together - wcs_opt, shape_opt = find_optimal_celestial_wcs([(s1, w1), (s2, w2)], auto_rotate=False, + wcs_opt, shape_opt = find_optimal_celestial_wcs([(s1, w1), (s2, w2)], + projection=projection, + auto_rotate=False, **kwargs) + # find spectral coverage + specw1 = WCS(header1).spectral + specw2 = WCS(header2).spectral + + # if the spectral axes are the same, no need to do anything else: + if specw1.wcs.compare(specw2.wcs): + new_naxis = specw1.array_shape[0] + new_crpix3 = specw1.wcs.crpix[0] + + new_crval3 = specw1.wcs.crval[0] + new_cunit3 = specw1.wcs.cunit[0] + new_cdelt3 = specw1.wcs.cdelt[0] + + else: + # NOTE: EWK -- this failed combining different planes for + # test_regrid.py::test_mosaic_cubes_spectral. See below for proposed fix: + # range1 = specw1.pixel_to_world([0, specw1.array_shape[0]-1]) + # range2 = specw2.pixel_to_world([0, specw2.array_shape[0]-1]) + + range1 = specw1.pixel_to_world([0, specw1.array_shape[0]]) + range2 = specw2.pixel_to_world([0, specw2.array_shape[0]]) + + # check for overlap + # this will raise an exception if the headers are an different units, which we want + if max(range1) < min(range2) or max(range2) < min(range1): + warnings.warn(f"There is no spectral overlap between {range1} and {range2}") + + # check cdelt + dx1 = specw1.proj_plane_pixel_scales()[0] + dx2 = specw2.proj_plane_pixel_scales()[0] + if np.abs((dx1 - dx2)/dx1) > spectral_dx_threshold: + raise ValueError(f"Different spectral pixel scale {dx1} vs {dx2}") + + ranges = np.hstack([range1, range2]) + new_naxis = int(np.ceil((ranges.max() - ranges.min()) / np.abs(dx1))) + if specw1.wcs.cdelt == 1.0: + raise NotImplementedError("Spectral WCS doesn't have a CDELT parameter; we don't know how to deal with this generally.") + if np.sign(specw1.wcs.cdelt) == 1: + new_crval3 = ranges.min().to(range1.unit).value + elif np.sign(specw1.wcs.cdelt) == -1: + new_crval3 = ranges.max().to(range1.unit).value + else: + raise "WTF?" + + new_crpix3 = 1 + new_cunit3 = specw1.wcs.cunit[0] + new_cdelt3 = dx1.value + # Make a new header using the optimal wcs and information from cubes header = header1.copy() header['NAXIS'] = 3 header['NAXIS1'] = shape_opt[1] header['NAXIS2'] = shape_opt[0] - header['NAXIS3'] = header1['NAXIS3'] + header['NAXIS3'] = new_naxis header.update(wcs_opt.to_header()) + header['CRVAL3'] = new_crval3 + header['CRPIX3'] = new_crpix3 + header['CUNIT3'] = new_cunit3.to_string() + header['CDELT3'] = new_cdelt3 header['WCSAXES'] = 3 + header['BITPIX'] = header1['BITPIX'] + return header -def mosaic_cubes(cubes, spectral_block_size=100, combine_header_kwargs={}, **kwargs): +def _getdata(cube): + """ + Must be defined out-of-scope to enable pickling + """ + return (cube.unitless_filled_data[:], cube.wcs) + +def mosaic_cubes(cubes, spectral_block_size=100, combine_header_kwargs={}, + target_header=None, + commonbeam=None, + weightcubes=None, + save_to_tmp_dir=True, + use_memmap=True, + output_file=None, + method='cube', + verbose=True, + fail_if_cube_dropped=False, + fail_if_channel_empty=True, + return_footprint=False, + client=None, + extrapolation_tolerance=1e-6, + reproject_kwargs={}, + **kwargs): ''' This function reprojects cubes onto a common grid and combines them to a single field. + For channel mode, cubes must have at least two channels, and only spectral + values between two channels will be included. i.e., spectral interpolation + is performed, not extrapolation. + Parameters ---------- cubes : iterable @@ -801,56 +941,476 @@ def mosaic_cubes(cubes, spectral_block_size=100, combine_header_kwargs={}, **kwa combine_header_kwargs : dict Keywords passed to `~reproject.mosaicking.find_optimal_celestial_wcs` via `combine_headers`. + commonbeam : Beam + If specified, will smooth the data to this common beam before + reprojecting. + weightcubes : None + Cubes with same shape as input cubes containing the weights + save_to_tmp_dir : bool + Default is to set `save_to_tmp_dir=True` because we expect cubes to be + big. + use_memmap : bool + Use a memory-mapped array to save the mosaicked cube product? + output_file : str or None + If specified, this should be a FITS filename that the output *array* + will be stored into (the footprint will not be saved) + method : 'cube' or 'channel' + Over what dimension should we iterate? Options are 'cube' and + 'channel'. + verbose : bool + Show a progress bar. + fail_if_cube_dropped : bool + If True, will raise an exception if any cubes are dropped from the mosaic. + fail_if_channel_empty : bool + If True, will raise an exception if any channels in the mosaic are empty. + return_footprint : bool + If True, will return the footprint of the mosaic. Default is False. + client : Client, optional + Pass a `dask.distributed.Client `_ to use + Dask to parallelize operations. Default is to create the local client. + extrapolation_tolerance: float + If there is a channel in the input cube within this tolerance of the + output cube, keep it even though it might result in extrapolation. + Outputs ------- cube : SpectralCube A spectral cube with the list of cubes mosaicked together. ''' + from reproject.mosaicking import reproject_and_coadd + from reproject import reproject_interp + import warnings + from astropy.utils.exceptions import AstropyUserWarning + warnings.filterwarnings('ignore', category=AstropyUserWarning) + + if verbose: + from tqdm import tqdm as std_tqdm + else: + class tqdm: + def __init__(self, x, **kwargs): + self.iterable = x + def __iter__(self): + return iter(self.iterable) + def __call__(self, x, **kwargs): + return x + def set_description(self, desc, **kwargs): + pass + def update(self, **kwargs): + pass + std_tqdm = tqdm + + def log_(x): + if verbose: + log.info(x) + else: + log.debug(x) cube1 = cubes[0] - header = cube1.header - # Create a header for a field containing all cubes - for cu in cubes[1:]: - header = combine_headers(header, cu.header, **combine_header_kwargs) + if target_header is None: + target_header = cube1.header + + # Create a header for a field containing all cubes + for cu in cubes[1:]: + target_header = combine_headers(target_header, cu.header, **combine_header_kwargs) # Prepare an array and mask for the final cube - shape_opt = (header['NAXIS3'], header['NAXIS2'], header['NAXIS1']) - final_array = np.zeros(shape_opt) + shape_opt = (target_header['NAXIS3'], target_header['NAXIS2'], target_header['NAXIS1']) + nbits = int(abs(target_header['BITPIX'])) + if nbits not in (16, 32, 64): + raise ValueError(f"Invalid BITPIX specified: {nbits} (must be 16, 32, or 64)") + dtype = f"float{nbits}" + # header.tofile decides on the dtype; we force it to be float here because int doesn't make sense + if target_header['BITPIX'] > 0: + log.warn("Positive values of BITPIX, indicating integers, are not supported. Forcing float.") + target_header['BITPIX'] = -nbits + + if output_file is not None: + log_(f"Using output file {output_file}") + t0 = time.time() + if not output_file.endswith('.fits'): + raise IOError("Only FITS output is supported") + if not os.path.exists(output_file): + # https://docs.astropy.org/en/stable/generated/examples/io/skip_create-large-fits.html#sphx-glr-generated-examples-io-skip-create-large-fits-py + hdu = fits.PrimaryHDU(data=np.ones([5,5,5], dtype=dtype), + header=target_header + ) + # sanity check: was repeatedly encountering dtype=i8 when set to float64 + assert hdu.data.dtype == dtype + for kwd in ('NAXIS1', 'NAXIS2', 'NAXIS3'): + hdu.header[kwd] = target_header[kwd] + + log_(f"Dumping header to file {output_file} (dt={time.time()-t0})") + target_header.tofile(output_file, overwrite=True) + with open(output_file, 'rb+') as fobj: + fobj.seek(len(target_header.tostring()) + + (np.prod(shape_opt) * np.abs(nbits//8)) - 1) + fobj.write(b'\0') + + # instant sanity check + hdul = fits.open(output_file) + output_array = hdul[0].data + if not output_array.dtype.name == dtype: + raise TypeError(f"Output array dtype is {output_array.dtype.name}, not {dtype} as requested") + + log_(f"Loading header from file {output_file} (dt={time.time()-t0})") + hdul = fits.open(output_file, mode='update', overwrite=True) + output_array = hdul[0].data + assert output_array.dtype.name == dtype + hdul.flush() # make sure the header gets written right + + log_(f"Creating footprint file dt={time.time()-t0}") + # use memmap - not a FITS file - for the footprint + # if we want to do partial writing, though, it's best to make footprint an extension + ntf2 = tempfile.NamedTemporaryFile() + output_footprint = np.memmap(ntf2, mode='w+', shape=shape_opt, dtype=dtype) + log_(f"Temporary footprint file is {ntf2.name}") + + # default footprint to 1 assuming there is some stuff already in the image + # this is a hack and maybe just shouldn't be attempted + #log_("Initializing footprint to 1s") + #output_footprint[:] = 1 # takes an hour?! + log_(f"Done initializing memory dt={time.time()-t0}") + elif use_memmap: + log_("Using memmap") + ntf = tempfile.NamedTemporaryFile() + output_array = np.memmap(ntf, mode='w+', shape=shape_opt, dtype=dtype) + ntf2 = tempfile.NamedTemporaryFile() + output_footprint = np.memmap(ntf2, mode='w+', shape=shape_opt, dtype=dtype) + else: + log_("Using memory") + output_array = np.zeros(shape_opt, dtype=dtype) + output_footprint = np.zeros(shape_opt, dtype=dtype) + + # initialize output array to be all NaNs + # this shouldn't be necessary, but seems to have become so for reasons I don't understand. + # I'm seeing all zeros instead of nans in the weight=0 regions in early 2024. + # output_array[:] = np.nan + # apparently setting this turns all the _data_ into nan, and the _nondata_ stays zero... that's nonsense + # so I'm still at a total loss why I have nans in my data + mask_opt = np.zeros(shape_opt[1:]) - for cube in cubes: - # Reproject cubes to the header + if output_array.dtype.kind != 'f': + raise TypeError(f"Output array is not a float, it is {output_array.dtype.kind}, while {dtype} was requested.") + + def get_common_beam(beams): + """ HACK """ + for epsilon in (5e-4, 1e-3, 1e-4, 5e-3, 1e-2): + for beam_threshold in np.logspace(-6, -2, 5): + try: + commonbeam = beams.common_beam(tolerance=beam_threshold, epsilon=epsilon) + return commonbeam + except (BeamError, ValueError) as ex: + print(f"Encountered beam error '{ex}' with threshold {beam_threshold} and epsilon {epsilon}. Trying again.") + raise BeamError("Failed to find common beam.") + + # check that the beams are deconvolvable + if commonbeam is not None: + log_(f"Assembling beams to convolve to get to the common beam {commonbeam}") + # assemble beams + beams = [cube.beam if hasattr(cube, 'beam') else get_common_beam(cube.beams) + for cube in cubes] + + for beam in beams: + # this will raise an exception if any of the cubes have bad beams + try: + commonbeam.deconvolve(beam, failure_returns_pointlike=True) + except Exception as ex: + print(f'commonbeam: {commonbeam}') + print(f'beam: {beam}') + raise ex + + if verbose: + class tqdm(std_tqdm): + def update(self, n=1): + if output_file is not None: + hdul.flush() # write to disk on each iteration + super().update(n) + + if method == 'cube': + log_("Using Cube method") + # Cube method: Regrid the whole cube in one operation. + # Let reproject_and_coadd handle any iterations + + if commonbeam is not None: + cubes = [cube.convolve_to(commonbeam, save_to_tmp_dir=save_to_tmp_dir) + for cube in std_tqdm(cubes, desc="Convolve:")] + # Redefine cube1 so the output has the appropriate class + cube1 = cubes[0] + try: - if spectral_block_size is not None: - cube_repr = cube.reproject(header, - block_size=[spectral_block_size, - cube.shape[1], - cube.shape[2]], - **kwargs) - else: - cube_repr = cube.reproject(header, **kwargs) - except TypeError: + output_array, output_footprint = reproject_and_coadd( + [cube.hdu for cube in cubes], + target_header, + input_weights=[cube.hdu for cube in weightcubes] if weightcubes is not None else None, + output_array=output_array, + output_footprint=output_footprint, + reproject_function=reproject_interp, + #progressbar=tqdm(desc='reproject_and_coadd') if verbose else False, + intermediate_memmap=True, + block_sizes=(None if spectral_block_size is None else + [(spectral_block_size, cube.shape[1], cube.shape[2]) + for cube in cubes]), + **reproject_kwargs + ) + except TypeError as ex: + # print the exception in case we caught a different TypeError than expected warnings.warn("The block_size argument is not accepted by `reproject`. " - "A more recent version may be needed.") - cube_repr = cube.reproject(header, **kwargs) + f"A more recent version may be needed. Exception was: {ex}") + output_array, output_footprint = reproject_and_coadd( + [cube.hdu for cube in cubes], + target_header, + input_weights=[cube.hdu for cube in weightcubes] if weightcubes is not None else None, + output_array=output_array, + output_footprint=output_footprint, + reproject_function=reproject_interp, + #progressbar=tqdm(desc='reproject_and_coadd') if verbose else False, + **reproject_kwargs + ) + elif method == 'channel': + log_("Using Channel method") + # Channel method: manually downselect to go channel-by-channel in the + # input cubes before handing off material to reproject_and_coadd This + # approach allows us more direct & granular control over memory and is + # likely better for large-area cubes + # (ideally we'd let Dask handle all the memory allocation choices under + # the hood, but as of early 2023, we do not yet have that capability) + + outwcs = WCS(target_header) + channels = outwcs.spectral.pixel_to_world(np.arange(target_header['NAXIS3'])) + dx = outwcs.spectral.proj_plane_pixel_scales()[0] + log_(f"Channel mode: dx={dx}. Looping over {len(channels)} channels and {len(cubes)} cubes") + + mincube_slices = [cube[cube.shape[0]//2:cube.shape[0]//2+1] + .subcube_slices_from_mask(cube[cube.shape[0]//2:cube.shape[0]//2+1].mask, + spatial_only=True) + if cube.mask is not None + # if there's no mask, we can't and don't want to attempt a mincube + else (slice(None), slice(None), slice(None)) + for cube in std_tqdm(cubes, desc='MinSubSlices:', delay=5)] + + if hasattr(channels, "__len__") and len(channels) > 1: + pbar = tqdm(enumerate(channels), desc="Channels") + using_pbar = True + else: + pbar = enumerate(channels) + using_pbar = False + + # grab a 2-channel slab + # this is very verbose but quite simple & cheap + # going to spectral_slab(channel-dx, channel+dx) gives 3-pixel cubes most often, + # which results in a 50% overhead in smoothing, etc. + def two_closest_channels(cube, channel): + dist = np.abs(cube.spectral_axis.to(channel.unit) - channel) + log.debug(f'dist={dist}') + closest = np.argmin(dist) + dist[closest] = np.inf + next_closest = np.argmin(dist) + if closest < next_closest: + return (closest, next_closest) + else: + return (next_closest, closest) + + for ii, channel in pbar: + if using_pbar: + pbar.set_description(f"Channel {ii}={channel}") + + chans = [two_closest_channels(cube, channel) + for cube in std_tqdm(cubes, delay=5, desc='ChanSel:')] + log.debug(f'chans={chans}, channel={channel}, spax={cubes[0].spectral_axis.to(channel.unit)}') + # reversed spectral axes still break things + # and we want two channels width, not one (which is why we use +1 here) + chans = [(ch1, ch2+1) if ch1 < ch2 else (ch2, ch1+1) for ch1, ch2 in chans] + log.debug(f'chans={chans}, channel={channel}') + + if not using_pbar: + log_(f"Using neighboring channels {chans}") + + with warnings.catch_warnings(): + warnings.simplefilter('ignore') # seriously NO WARNINGS. + + # exclude any cubes with invalid spatial slices (could happen if whole slices are masked out) + keep1 = [all(x > 1 for x in cube[ch1:ch2, slices[1], slices[2]].shape) + for (ch1, ch2), slices, cube in std_tqdm(zip(chans, mincube_slices, cubes), desc='keepcheck')] + if sum(keep1) < len(keep1): + log.debug(f"keep1={keep1} chans={chans}, mincube_slices={mincube_slices}") + log.debug(f"shapes={[cube[ch1:ch2, slices[1], slices[2]].shape for (ch1, ch2), slices, cube in zip(chans, mincube_slices, cubes)]}") + log.warn(f"Dropping {len(keep1)-sum(keep1)} cubes out of {len(keep1)} because they have invalid (empty) slices") + + if commonbeam is not None: + if verbose: + print(f"Convolving to common beam {commonbeam} for {mincube_slices}") + scubes = [(cube[ch1:ch2, slices[1], slices[2]] + .convolve_to(commonbeam)) + if kp + else None # placeholder, will drop this below but need to retain list shape + for (ch1, ch2), slices, cube, kp in std_tqdm(zip(chans, mincube_slices, cubes, keep1), + delay=5, desc='Subcubes (conv)') + ] + if ii == 0: + cube1 = scubes[0] + else: + scubes = [(cube[ch1:ch2, slices[1], slices[2]]) + if kp + else None # placeholder, will drop this below but need to retain list shape + for (ch1, ch2), slices, cube, kp in std_tqdm(zip(chans, mincube_slices, cubes, keep1), + delay=5, desc='Subcubes') + ] + + # rechunk if needed (should be very fast?) + scubes = [cube.rechunk() if hasattr(cube, 'rechunk') else cube for cube in scubes] + + # only keep2 cubes that are in range; the rest get excluded + keep2 = [(cube is not None) and + all(sh > 1 for sh in cube.shape) and + (( + (np.sign(channel) >= 0) and + (cube.spectral_axis.min() <= channel*(1+extrapolation_tolerance)) and + (cube.spectral_axis.max() >= channel*(1-extrapolation_tolerance))) + or ( + (np.sign(channel) == -1) and + (cube.spectral_axis.min() <= channel*(1-extrapolation_tolerance)) and + (cube.spectral_axis.max() >= channel*(1+extrapolation_tolerance))) + ) + for cube in scubes] + log.debug(f"channel range: {channel.value*(1+extrapolation_tolerance), channel.value*(1-extrapolation_tolerance)}") + # merge the two 'keep' arrays + keep = np.array(keep1) & np.array(keep2) + if sum(keep) < len(keep): + log.warn(f"Dropping {len(keep2)-sum(keep2)} cubes out of {len(keep)} because they're out of range") + dropped1 = list(np.where(~np.array(keep1))[0]) + dropped2 = list(np.where(~np.array(keep2))[0]) + dropped_indices = list(np.where(~keep)[0]) + log.warn(f"Dropped cube indices are {dropped_indices}. For keep1={dropped1}, for keep2={dropped2}") + if fail_if_cube_dropped: + log.warn(f"Channel={channel} dropped all the cubes. Cube shapes: {[cube.shape for cube in scubes]}") + for kk, cube in enumerate(scubes): + log.info(f"Cube {kk} has vmin={cube.spectral_axis.min()} vmax={cube.spectral_axis.max()} and channel={channel}") + raise ValueError(f"There were {len(keep)-sum(keep)} dropped cubes and fail_if_cube_dropped was set. Indices: {dropped_indices}") + scubes = [cube for cube, kp in zip(scubes, keep) if kp] + + if len(scubes) == 0: + log.warn(f"No cubes overlap with channel {channel}, skipping") + continue + + if weightcubes is not None: + if verbose: + print(f"Handling {len(weightcubes)} weight cubes") + # convert mincube_slices to weightcube coordinates + mincube_weight_slices = [] + # need to use cubes, not scubes, because mincube_slices are in cube units + for slc, wtc, cube in zip(mincube_slices, weightcubes, cubes): + ycrds = slc[1].start, slc[1].stop + xcrds = slc[2].start, slc[2].stop + if ycrds[0] is None or xcrds[0] is None: + wtxcrds = wtycrds = [None, None] + else: + skycrds = cube.wcs.celestial.pixel_to_world(xcrds, ycrds) + wtxcrds, wtycrds = wtc.wcs.celestial.world_to_pixel(skycrds) + wtxcrds = [int(np.round(x)) for x in wtxcrds] + wtycrds = [int(np.round(y)) for y in wtycrds] + + if wtxcrds[0] < 0: + wtxcrds[0] = 0 + warnings.warn("Cube is larger than weight cube") + if wtycrds[0] < 0: + wtycrds[0] = 0 + warnings.warn("Cube is larger than weight cube") + assert wtxcrds[1] > 0 + assert wtycrds[1] > 0 + log.debug(f"skycrds={skycrds}") # DEBUG + log.debug(f"Cube slices went from x={xcrds} to {wtxcrds} and y={ycrds} to {wtycrds}") # DEBUG + log.debug(f"pixel scales: {cube.wcs.celestial.proj_plane_pixel_area()**0.5} {wtc.wcs.celestial.proj_plane_pixel_area()**0.5}") + + # handle spectral cutting. for cubes, we split this into min_cube_slices + chans, + # but here we're doing it all at once + ch1, ch2 = two_closest_channels(wtc, channel) + ch1, ch2 = (ch1, ch2+1) if ch1 < ch2 else (ch2, ch1+1) + log.debug(f'wtchans={ch1, ch2}') # DEBUG + zslc = slice(ch1, ch2) + + wtslc = zslc, slice(wtycrds[0], wtycrds[1]), slice(wtxcrds[0], wtxcrds[1]) + mincube_weight_slices.append(wtslc) + log.debug(f"mincube_slices = {mincube_slices}") # DEBUG + log.debug(f"mincube_weight_slices = {mincube_weight_slices}") # DEBUG + log.debug(f"weightcube shapes: {[wtcube.shape for wtcube in weightcubes]}") # DEBUG + + sweightcubes = [wtcube[slices] + for slices, wtcube, kp + in std_tqdm(zip(mincube_weight_slices, weightcubes, keep), + delay=5, desc='Subweight') + if kp + ] + print(f"Loading weight HDUs - this may load the weight cubes into memory. Shapes={[x.shape for x in sweightcubes]}") + assert [0 not in x.shape for x in sweightcubes] + wthdus = [cube.hdu + for cube in std_tqdm(sweightcubes, delay=5, desc='WeightData')] + else: + wthdus = None + + + # reproject_and_coadd requires the actual arrays, so this is the convolution step + + # commented out approach here: just let spectral-cube handle the convolution etc. + #hdus = [(cube._get_filled_data(), cube.wcs) + # for cube in std_tqdm(scubes, delay=5, desc='Data/conv')] + + # somewhat faster (?) version - ask the dask client to handle + # gathering the data + # (this version is capable of parallelizing over many cubes, in + # theory; the previous would treat each cube in serial) + print("Loading data into memory (this step should trigger convolution for dask arrays)", flush=True) + log_(f"Chunk sizes: {[cube._data.chunksize if hasattr(cube._data, 'chunksize') else 'n/a' for cube in scubes]}") + datas = [cube._get_filled_data() for cube in scubes] + wcses = [cube.wcs for cube in scubes] + + if len(datas) == 0: + log.warn(f"Channel {ii}={channel} has no matched data. Skipping") + continue + + hdus = list(zip(datas, wcses)) + + # project into array w/"dummy" third dimension + # (outputs with trailing underscores are not used; data is written directly into the output array chunks) + output_array_, output_footprint_ = reproject_and_coadd( + input_data=hdus, + output_projection=outwcs[ii:ii+1, :, :], + shape_out=(1,) + output_array.shape[1:], + output_array=output_array[ii:ii+1,:,:], + output_footprint=output_footprint[ii:ii+1,:,:], + reproject_function=reproject_interp, + input_weights=wthdus, + #block_size=[2,-1,-1], + #progressbar=partial(tqdm, desc=f'coadd ch{ii}') if verbose else False, + match_background=False, + **reproject_kwargs + ) + + if np.all(output_array_[np.isfinite(output_array_)] == 0) or np.all(output_array[ii:ii+1,:,:][np.isfinite(output_array[ii:ii+1,:,:])] == 0): + if fail_if_channel_empty: + raise ValueError(f"Channel {ii}={channel} ended up totally empty") + else: + log.warn(f"Channel {ii}={channel} ended up totally empty") + + if using_pbar: + pbar.set_description(f"Channel {ii}={channel} done") - # Create weighting mask (2D) - mask = (cube_repr[0:1].get_mask_array()[0]) - mask_opt += mask.astype(float) + # Create Cube + cube = cube1.__class__(data=output_array * cube1.unit, wcs=WCS(target_header)) - # Go through each slice of the cube, add it to the final array - for ii in range(final_array.shape[0]): - slice1 = np.nan_to_num(cube_repr.unitless_filled_data[ii]) - final_array[ii] = final_array[ii] + slice1 + if output_file is not None: + if commonbeam is not None: + # Append the beam info to the header + hdul[0].header.update(commonbeam.to_header_keywords()) - # Dividing by the mask throws errors where it is zero - with np.errstate(divide='ignore'): + hdul.flush() + hdul.close() - # Use weighting mask to average where cubes overlap - for ss in range(final_array.shape[0]): - final_array[ss] /= mask_opt + if commonbeam is not None: + cube = cube.with_beam(commonbeam, raise_error_jybm=False) - # Create Cube - cube = cube1.__class__(data=final_array * cube1.unit, wcs=WCS(header)) - return cube + if return_footprint: + return cube, output_footprint + else: + return cube diff --git a/spectral_cube/dask_spectral_cube.py b/spectral_cube/dask_spectral_cube.py index b4c4d718..0e0fcd2a 100644 --- a/spectral_cube/dask_spectral_cube.py +++ b/spectral_cube/dask_spectral_cube.py @@ -1447,6 +1447,103 @@ def convfunc(img, **kwargs): accepts_chunks=True, **kwargs).with_beam(beam, raise_error_jybm=False) + def reproject(self, header, order='bilinear', use_memmap=False, + filled=True, **kwargs): + """ + Spatially reproject the cube into a new header. Fills the data with + the cube's ``fill_value`` to replace bad values before reprojection. + + If you want to reproject a cube both spatially and spectrally, you need + to use `spectral_interpolate` as well. + + .. warning:: + The current implementation of ``reproject`` requires that the whole + cube be loaded into memory. Issue #506 notes that this is a + problem, and it is on our to-do list to fix. + + Parameters + ---------- + header : `astropy.io.fits.Header` + A header specifying a cube in valid WCS + order : int or str, optional + The order of the interpolation (if ``mode`` is set to + ``'interpolation'``). This can be either one of the following + strings: + + * 'nearest-neighbor' + * 'bilinear' + * 'biquadratic' + * 'bicubic' + + or an integer. A value of ``0`` indicates nearest neighbor + interpolation. + use_memmap : bool + If specified, a memory mapped temporary file on disk will be + written to rather than storing the intermediate spectra in memory. + filled : bool + Fill the masked values with the cube's fill value before + reprojection? Note that setting ``filled=False`` will use the raw + data array, which can be a workaround that prevents loading large + data into memory. + kwargs : dict + Passed to `reproject.reproject_interp`. + """ + + try: + from reproject.version import version + except ImportError: + raise ImportError("Requires the reproject package to be" + " installed.") + + reproj_kwargs = kwargs + # Need version > 0.2 to work with cubes, >= 0.5 for memmap + from distutils.version import LooseVersion + if LooseVersion(version) < "0.5": + raise Warning("Requires version >=0.5 of reproject. The current " + "version is: {}".format(version)) + elif LooseVersion(version) >= "0.6": + pass # no additional kwargs, no warning either + else: + reproj_kwargs['independent_celestial_slices'] = True + + from reproject import reproject_interp + + # TODO: Find the minimal subcube that contains the header and only reproject that + # (see FITS_tools.regrid_cube for a guide on how to do this) + + newwcs = wcs.WCS(header) + shape_out = tuple([header['NAXIS{0}'.format(i + 1)] for i in + range(header['NAXIS'])][::-1]) + + # def reproject_interp_wrapper(img_slice, **kwargs): + # # What exactly is the wrapper getting here? + # # I think it is given a _cube_ that is a cutout? + # # No, it is getting dask arrays (at least sometimes) + # if filled: + # data = img_slice.filled_data[:] + # else: + # data = img_slice._data + # return reproject_interp((data, img_slice.header), + # newwcs, shape_out=shape_out, **kwargs) + + # newcube, newcube_valid = self.apply_function_parallel_spatial( + # reproject_interp_wrapper, + # accepts_chunks=True, + # order=order, + # **reproj_kwargs) + + newcube, newcube_valid = reproject_interp((self.filled_data[:] if filled else self._data, self.header), + newwcs, shape_out=shape_out, **kwargs + ) + + return self._new_cube_with(data=newcube, + wcs=newwcs, + mask=BooleanArrayMask(newcube_valid.astype('bool'), + newwcs), + meta=self.meta, + ) + + class DaskVaryingResolutionSpectralCube(DaskSpectralCubeMixin, VaryingResolutionSpectralCube): @@ -1601,9 +1698,15 @@ def convfunc(img, beam, **kwargs): rechunk=('auto', -1, -1), **kwargs) + # Remove header keyword for presence of a CASA beam table post-convolution. + new_header = cube.header.copy() + if "CASAMBM" in new_header: + del new_header['CASAMBM'] + # Result above is a DaskVaryingResolutionSpectralCube, convert to DaskSpectralCube newcube = DaskSpectralCube(data=cube._data, beam=beam, + header=new_header, wcs=cube.wcs, mask=cube.mask, meta=cube.meta, diff --git a/spectral_cube/spectral_cube.py b/spectral_cube/spectral_cube.py index ffb0039b..b22311a1 100644 --- a/spectral_cube/spectral_cube.py +++ b/spectral_cube/spectral_cube.py @@ -965,7 +965,7 @@ def _cube_on_cube_operation(self, function, cube, equivalencies=[], **kwargs): Passed to np.testing.assert_almost_equal """ assert cube.shape == self.shape - if not self.unit.is_equivalent(cube.unit, equivalencies=equivalencies): + if not self.unit.is_equivalent(cube.unit, equivalencies=equivalencies) and cube.unit != u.dimensionless_unscaled: raise u.UnitsError("{0} is not equivalent to {1}" .format(self.unit, cube.unit)) if not wcs_utils.check_equality(self.wcs, cube.wcs, warn_missing=True, @@ -983,7 +983,11 @@ def _cube_on_cube_operation(self, function, cube, equivalencies=[], **kwargs): "cube. The error was: {0}".format(ex, function)) - cube = cube.to(self.unit) + # Don't convert another cube that is dimensionless. + # e.g. applying the pb correction to a interferometric image. + if cube.unit != u.dimensionless_unscaled: + cube = cube.to(self.unit) + data = function(self._data, cube._data) try: # multiplication, division, etc. are valid inter-unit operations diff --git a/spectral_cube/tests/test_regrid.py b/spectral_cube/tests/test_regrid.py index a7294f89..dd437e7f 100644 --- a/spectral_cube/tests/test_regrid.py +++ b/spectral_cube/tests/test_regrid.py @@ -3,6 +3,7 @@ import tempfile import numpy as np import os +import itertools from astropy import constants, units as u from astropy import convolution @@ -616,7 +617,8 @@ def test_mosaic_cubes(use_memmap, data_adv, use_dask, spectral_block_size): # which isn't auto-computed, resulting in nan pixels in the WCS transform cube._wcs.wcs.restwav = constants.c.to(u.m/u.s).value / cube.wcs.wcs.restfrq - expected_wcs = WCS(combine_headers(cube.header, cube.header)).celestial + expected_header = combine_headers(cube.header, cube.header) + expected_wcs = WCS(expected_header).celestial # Make two overlapping cubes of the data part1 = cube[:, :round(cube.shape[1]*2./3.), :] @@ -625,16 +627,255 @@ def test_mosaic_cubes(use_memmap, data_adv, use_dask, spectral_block_size): assert part1.wcs.wcs.restwav != 0 assert part2.wcs.wcs.restwav != 0 - result = mosaic_cubes([part1, part2], order='nearest-neighbor', + # Mosaic give the expected header. + result = mosaic_cubes([part1, part2], + # order is not a mosaic_cubes argument order='nearest-neighbor', + target_header=cube.header, + # not used roundtrip_coords=False, + spectral_block_size=spectral_block_size, + save_to_tmp_dir=False, + verbose=True, + use_memmap=use_memmap, + method='cube') + + # Check that the shapes are the same + assert result.shape == cube.shape + + assert repr(cube.wcs.celestial) == repr(result.wcs.celestial) + + # Check WCS in reprojected matches wcs_out + # (comparing WCS failed for no reason we could discern) + assert repr(expected_wcs) == repr(result.wcs.celestial) + + # Check that values of original and result are comparable + np.testing.assert_almost_equal(result.filled_data[:].value, + cube.filled_data[:].value, decimal=9) + + + # Mosaic auto-solving for the mosaic header. + result2 = mosaic_cubes([part1, part2],order='nearest-neighbor', + target_header=None, + roundtrip_coords=False, + spectral_block_size=spectral_block_size, + save_to_tmp_dir=False, + verbose=False, + use_memmap=use_memmap, + method='cube') + + # Check that the shapes are the same + assert result2.shape == cube.shape + + assert repr(cube.wcs.celestial) == repr(result2.wcs.celestial) + + # Check WCS in reprojected matches wcs_out + # (comparing WCS failed for no reason we could discern) + assert repr(expected_wcs) == repr(result2.wcs.celestial) + + # Check that values of original and result2 are comparable + np.testing.assert_almost_equal(result2.filled_data[:].value, + cube.filled_data[:].value, decimal=9) + + +@pytest.mark.parametrize('spectral_block_size,use_memmap,method', + itertools.product((1,100,None),(True,False),('cube','channel'))) +def test_mosaic_cubes_spectral(use_memmap, data_adv, use_dask, spectral_block_size, method): + + pytest.importorskip('reproject') + + # Read in data to use + cube, data = cube_and_raw(data_adv, use_dask=use_dask) + + # cube is doppler-optical by default, which uses the rest wavelength, + # which isn't auto-computed, resulting in nan pixels in the WCS transform + cube._wcs.wcs.restwav = constants.c.to(u.m/u.s).value / cube.wcs.wcs.restfrq + + expected_header = combine_headers(cube.header, cube.header) + expected_wcs = WCS(expected_header).celestial + + # Make three cubes to spectrally mosaic. + # (they must have >= 2 channels) + part1 = cube[:2] + part2 = cube[1:3] + part3 = cube[2:] + + assert part1.wcs.wcs.restwav != 0 + assert part2.wcs.wcs.restwav != 0 + assert part3.wcs.wcs.restwav != 0 + + # Mosaic give the expected header. + result = mosaic_cubes([part1, part2, part3], order='nearest-neighbor', + target_header=cube.header, roundtrip_coords=False, - spectral_block_size=spectral_block_size) + spectral_block_size=spectral_block_size, + save_to_tmp_dir=False, + verbose=False, + use_memmap=use_memmap, + method=method) # Check that the shapes are the same assert result.shape == cube.shape + assert repr(cube.wcs.celestial) == repr(result.wcs.celestial) + # Check WCS in reprojected matches wcs_out # (comparing WCS failed for no reason we could discern) assert repr(expected_wcs) == repr(result.wcs.celestial) + # Check that values of original and result are comparable - np.testing.assert_almost_equal(result.filled_data[:].value, cube.filled_data[:].value, decimal=3) - # only good to 3 decimal places is not amazing... + np.testing.assert_almost_equal(result.filled_data[:].value, + cube.filled_data[:].value, decimal=8) + + + # Mosaic auto-solving for the mosaic header. + result2 = mosaic_cubes([part1, part2, part3], + order='nearest-neighbor', + target_header=None, + roundtrip_coords=False, + spectral_block_size=spectral_block_size, + save_to_tmp_dir=False, + verbose=False, + use_memmap=use_memmap, + method=method) + + # Check that the shapes are the same + assert result2.shape == cube.shape + + assert repr(cube.wcs.celestial) == repr(result2.wcs.celestial) + + # Check WCS in reprojected matches wcs_out + # (comparing WCS failed for no reason we could discern) + assert repr(expected_wcs) == repr(result2.wcs.celestial) + + # Check that values of original and result2 are comparable + np.testing.assert_almost_equal(result2.filled_data[:].value, + cube.filled_data[:].value, decimal=8) + + +def test_cube_mosaic_complex(): + from spectral_cube import SpectralCube + from astropy.io import fits + from astropy import units as u + from spectral_cube.tests import utilities; from spectral_cube.cube_utils import mosaic_cubes, combine_headers + + cube1 = utilities.generate_hdu(data=np.ones([3,4,5]), + pixel_scale=0.1*u.arcsec, spec_scale=1*u.km/u.s, beamfwhm=0.2*u.arcsec, + v0=0) + cube2 = utilities.generate_hdu(data=np.ones([3,4,5]) * 2, + pixel_scale=0.11*u.arcsec, spec_scale=1*u.km/u.s, beamfwhm=0.3*u.arcsec, + v0=0.25) + # shift the cube by 1.5 pixels to ensure overlap + cube2.header['CRVAL1'] = 1.5 * (0.11*u.arcsec).to(u.deg).value + cube1 = SpectralCube.read(cube1) + cube2 = SpectralCube.read(cube2) + + # so I can do some sanity checks, i.e. that cube1[0] has same velocity as target_header[0] + target_header = combine_headers(cube1.header, cube2.header) + + result = mosaic_cubes([cube1, cube2], + target_header=None, + roundtrip_coords=False, + save_to_tmp_dir=False, + verbose=True, + use_memmap=True, + method='cube') + + result2 = mosaic_cubes([cube1, cube2], + target_header=None, + roundtrip_coords=False, + save_to_tmp_dir=False, + verbose=True, + use_memmap=True, + using_pbar=False, + method='channel') + + result3 = mosaic_cubes([cube1, cube2], + target_header=None, + roundtrip_coords=False, + save_to_tmp_dir=False, + verbose=False, + use_memmap=False, + method='channel') + + # the 'full cube' approach _extrapolates_ the first pixel, even though it's out of range + # this is likely a reproject bug + assert np.all(result.filled_data[1:,:,:] == result2.filled_data[1:,:,:]) + assert np.all(result.filled_data[1:,:,:] == result3.filled_data[1:,:,:]) + + assert np.all(result2.filled_data[:] == result3.filled_data[:]) + + +def test_cube_mosaic_weighted(): + from spectral_cube import SpectralCube + from astropy.io import fits + from astropy import units as u + from spectral_cube.tests import utilities; from spectral_cube.cube_utils import mosaic_cubes, combine_headers + + cube1 = utilities.generate_hdu(data=np.ones([3,4,5]), + pixel_scale=0.1*u.arcsec, spec_scale=1*u.km/u.s, beamfwhm=0.2*u.arcsec, + v0=0) + cube2 = utilities.generate_hdu(data=np.ones([3,4,5]) * 2, + pixel_scale=0.11*u.arcsec, spec_scale=1*u.km/u.s, beamfwhm=0.3*u.arcsec, + v0=0.25) + + weights1 = utilities.generate_hdu(data=np.ones([3,4,5]) * 0.5, + pixel_scale=0.1*u.arcsec, spec_scale=1*u.km/u.s, beamfwhm=0.2*u.arcsec, + v0=0) + weights2 = utilities.generate_hdu(data=np.ones([3,4,5]) * 1, + pixel_scale=0.11*u.arcsec, spec_scale=1*u.km/u.s, beamfwhm=0.3*u.arcsec, + v0=0.25) + # shift the cube by 1.5 pixels to ensure overlap + cube2.header['CRVAL1'] = 1.5 * (0.11*u.arcsec).to(u.deg).value + weights2.header['CRVAL1'] = 1.5 * (0.11*u.arcsec).to(u.deg).value + weights1.header['BUNIT'] = '' + weights2.header['BUNIT'] = '' + cube1 = SpectralCube.read(cube1) + cube2 = SpectralCube.read(cube2) + weights1 = SpectralCube.read(weights1) + weights2 = SpectralCube.read(weights2) + + # so I can do some sanity checks, i.e. that cube1[0] has same velocity as target_header[0] + target_header = combine_headers(cube1.header, cube2.header) + + # go super verbose (for debugging) + # (saved so I can do it later) + # from logging import getLogger + # import logging + # logger = getLogger('reproject.mosaicking.coadd') + # logger.setLevel(logging.INFO) + # handler = logging.StreamHandler() + # logger.addHandler(handler) + + result = mosaic_cubes([cube1, cube2], + weightcubes=[weights1, weights2], + target_header=None, + roundtrip_coords=False, + save_to_tmp_dir=False, + verbose=True, + use_memmap=True, + method='cube') + + result2 = mosaic_cubes([cube1, cube2], + weightcubes=[weights1, weights2], + target_header=None, + roundtrip_coords=False, + save_to_tmp_dir=False, + verbose=True, + use_memmap=True, + using_pbar=False, + method='channel') + + result3 = mosaic_cubes([cube1, cube2], + weightcubes=[weights1, weights2], + target_header=None, + roundtrip_coords=False, + save_to_tmp_dir=False, + verbose=False, + use_memmap=False, + method='channel') + + # the 'full cube' approach _extrapolates_ the first pixel, even though it's out of range + # this is likely a reproject bug + assert np.all(result.filled_data[1:,:,:] == result2.filled_data[1:,:,:]) + assert np.all(result.filled_data[1:,:,:] == result3.filled_data[1:,:,:]) + + assert np.all(result2.filled_data[:] == result3.filled_data[:]) \ No newline at end of file diff --git a/spectral_cube/tests/test_spectral_cube.py b/spectral_cube/tests/test_spectral_cube.py index 4e73eccc..001bfffc 100644 --- a/spectral_cube/tests/test_spectral_cube.py +++ b/spectral_cube/tests/test_spectral_cube.py @@ -646,6 +646,18 @@ def test_argmax(self): self._check_numpy(self.c.argmax, d, np.nanargmax) self.c = self.d = None + def test_argmax_rays(self, use_dask): + """ + regression test: argmax must have integer dtype + """ + d = np.where(self.d > 0.5, self.d, -10) + if use_dask: + # there is no 'rays' approach in dask + result = self.c.argmax() + else: + result = self.c.argmax(how='ray') + assert 'int' in str(result.dtype) + def test_argmin(self): d = np.where(self.d > 0.5, self.d, 10) self._check_numpy(self.c.argmin, d, np.nanargmin) @@ -1355,12 +1367,14 @@ def test_twod_numpy_twoaxes(func, how, axis, filename, use_dask): cube._unit = u.K with warnings.catch_warnings(record=True) as wrn: + warnings.simplefilter('always', UserWarning) # this appears to get turned off somewhere else in testing if use_dask: spec = getattr(cube, func)(axis=axis) else: spec = getattr(cube, func)(axis=axis, how=how) if func == 'mean' and axis != (1,2): + print(func, how, axis, filename, use_dask) # pytest is reporting weird things, this is a hack assert 'Averaging over a spatial and a spectral' in str(wrn[-1].message) # data has a redundant 1st axis diff --git a/spectral_cube/tests/test_wcs_utils.py b/spectral_cube/tests/test_wcs_utils.py index d832aeb8..79b8e46b 100644 --- a/spectral_cube/tests/test_wcs_utils.py +++ b/spectral_cube/tests/test_wcs_utils.py @@ -171,6 +171,8 @@ def test_wcs_slice_unmatched_celestial(): wcs.wcs.ctype = ['RA---TAN', 'DEC--TAN', 'FREQ'] wcs.wcs.crpix = [50., 45., 30.] + warnings.simplefilter('always', UserWarning) + # drop RA with warnings.catch_warnings(record=True) as wrn: wcs_new = drop_axis(wcs, 0)