Source code for sunraster.spectrogram

import abc
import textwrap
import numbers

import numpy as np
from astropy.time import TimeDelta
import astropy.units as u
from ndcube.ndcube import NDCube
from ndcube.utils.cube import convert_extra_coords_dict_to_input_format

__all__ = ['SpectrogramCube']


# Define some custom error messages.
APPLY_EXPOSURE_TIME_ERROR = ("Exposure time correction has probably already "
                             "been applied since the unit already includes "
                             "inverse time. To apply exposure time correction "
                             "anyway, set 'force' kwarg to True.")
UNDO_EXPOSURE_TIME_ERROR = ("Exposure time correction has probably already "
                            "been undone since the unit does not include "
                            "inverse time. To undo exposure time correction "
                            "anyway, set 'force' kwarg to True.")
AXIS_NOT_FOUND_ERROR = " axis not found. If in extra_coords, axis name must be supported: "

# Define supported coordinate names for coordinate properties.
SUPPORTED_LONGITUDE_NAMES = ["custom:pos.helioprojective.lon", "pos.helioprojective.lon",
                             "longitude", "lon"]
SUPPORTED_LONGITUDE_NAMES += [name.upper() for name in SUPPORTED_LONGITUDE_NAMES] + \
                             [name.capitalize() for name in SUPPORTED_LONGITUDE_NAMES]
SUPPORTED_LONGITUDE_NAMES = np.array(SUPPORTED_LONGITUDE_NAMES)

SUPPORTED_LATITUDE_NAMES = ["custom:pos.helioprojective.lat", "pos.helioprojective.lat",
                            "latitude", "lat"]
SUPPORTED_LATITUDE_NAMES += [name.upper() for name in SUPPORTED_LATITUDE_NAMES] + \
                            [name.capitalize() for name in SUPPORTED_LATITUDE_NAMES]
SUPPORTED_LATITUDE_NAMES = np.array(SUPPORTED_LATITUDE_NAMES)

SUPPORTED_SPECTRAL_NAMES = ["em.wl", "em.energy", "em.freq", "wavelength", "energy",
                            "frequency", "freq", "lambda", "spectral"]
SUPPORTED_SPECTRAL_NAMES += [name.upper() for name in SUPPORTED_SPECTRAL_NAMES] + \
                            [name.capitalize() for name in SUPPORTED_SPECTRAL_NAMES]
SUPPORTED_SPECTRAL_NAMES = np.array(SUPPORTED_SPECTRAL_NAMES)

SUPPORTED_TIME_NAMES = ["time"]
SUPPORTED_TIME_NAMES += [name.upper() for name in SUPPORTED_TIME_NAMES] + \
                        [name.capitalize() for name in SUPPORTED_TIME_NAMES]
SUPPORTED_TIME_NAMES = np.array(SUPPORTED_TIME_NAMES)

SUPPORTED_EXPOSURE_NAMES = ["exposure time", "exposure_time", "exposure times",
                            "exposure_times", "exp time", "exp_time", "exp times", "exp_times"]
SUPPORTED_EXPOSURE_NAMES += [name.upper() for name in SUPPORTED_EXPOSURE_NAMES] + \
                            [name.capitalize() for name in SUPPORTED_EXPOSURE_NAMES]
SUPPORTED_EXPOSURE_NAMES = np.array(SUPPORTED_EXPOSURE_NAMES)


class SpectrogramABC(abc.ABC):

    # Abstract Base Class to define the basic API of Spectrogram classes.

    @abc.abstractproperty
    def spectral_axis(self):
        """
        Return the spectral coordinates for each pixel.
        """

    @abc.abstractproperty
    def time(self):
        """
        Return the time coordinates for each pixel.
        """

    @abc.abstractproperty
    def exposure_time(self):
        """
        Return the exposure time for each exposure.
        """

    @abc.abstractproperty
    def lon(self):
        """
        Return the longitude coordinates for each pixel.
        """

    @abc.abstractproperty
    def lat(self):
        """
        Return the latitude coordinates for each pixel.
        """

    @abc.abstractmethod
    def apply_exposure_time_correction(self, undo=False, force=False):
        """
        Applies or undoes exposure time correction to data and uncertainty and
        adjusts unit.

        Correction is only applied (undone) if the object's unit doesn't (does)
        already include inverse time.  This can be overridden so that correction
        is applied (undone) regardless of unit by setting force=True.

        Parameters
        ----------
        undo: `bool`
            If False, exposure time correction is applied.
            If True, exposure time correction is undone.
            Default=False

        force: `bool`
            If not True, applies (undoes) exposure time correction only if unit
            doesn't (does) already include inverse time.
            If True, correction is applied (undone) regardless of unit.  Unit is still
            adjusted accordingly.

        Returns
        -------
        result: `SpectrogramCube`
            New SpectrogramCube in new units.
        """


[docs]class SpectrogramCube(NDCube, SpectrogramABC): """ Class representing a sit-and-stare or single raster of slit spectrogram data. Must be described by a single WCS. Parameters ---------- data: `numpy.ndarray` The array holding the actual data in this object. wcs: `ndcube.wcs.wcs.WCS` The WCS object containing the axes' information extra_coords : iterable of `tuple`, each with three entries, optional (`str`, `int`, `astropy.units.quantity` or array-like) Gives the name, axis of data, and values of coordinates of a data axis not included in the WCS object. unit : `astropy.unit.Unit` or `str`, optional Unit for the dataset. Strings that can be converted to a Unit are allowed. meta : dict-like object, optional Additional meta information about the dataset. uncertainty : any type, optional Uncertainty in the dataset. Should have an attribute uncertainty_type that defines what kind of uncertainty is stored, for example "std" for standard deviation or "var" for variance. A metaclass defining such an interface is NDUncertainty - but isn’t mandatory. If the uncertainty has no such attribute the uncertainty is stored as UnknownUncertainty. Defaults to None. mask : any type, optional Mask for the dataset. Masks should follow the numpy convention that valid data points are marked by False and invalid ones with True. Defaults to None. copy : `bool`, optional Indicates whether to save the arguments as copy. True copies every attribute before saving it while False tries to save every parameter as reference. Note however that it is not always possible to save the input as reference. Default is False. """ def __init__(self, data, wcs, extra_coords=None, unit=None, uncertainty=None, meta=None, mask=None, instrument_axes=None, copy=False, **kwargs): # Initialize SpectrogramCube. super().__init__(data, wcs=wcs, uncertainty=uncertainty, mask=mask, meta=meta, unit=unit, extra_coords=extra_coords, copy=copy, **kwargs) # Determine labels and location of each key real world coordinate. self_extra_coords = self.extra_coords world_axis_physical_types = np.array(self.wcs.world_axis_physical_types) self._longitude_name, self._longitude_loc = _find_axis_name( SUPPORTED_LONGITUDE_NAMES, world_axis_physical_types, self_extra_coords) self._latitude_name, self._latitude_loc = _find_axis_name( SUPPORTED_LATITUDE_NAMES, world_axis_physical_types, self_extra_coords) self._spectral_name, self._spectral_loc = _find_axis_name( SUPPORTED_SPECTRAL_NAMES, world_axis_physical_types, self_extra_coords) self._time_name, self._time_loc = _find_axis_name( SUPPORTED_TIME_NAMES, world_axis_physical_types, self_extra_coords) self._exposure_time_name, self._exposure_time_loc = _find_axis_name( SUPPORTED_EXPOSURE_NAMES, world_axis_physical_types, self_extra_coords) # Set up instrument axes if set. if instrument_axes is None: self.instrument_axes = instrument_axes elif len(instrument_axes) != data.ndim: raise ValueError("Length of instrument_axes must match number of data axes.") else: self.instrument_axes = np.asarray(instrument_axes, dtype=str) def __str__(self): if self._time_name: if self.time.isscalar: time_period = self.time else: time_period = (str(self.time.min()), str(self.time.max())) else: time_period = None if self._longitude_name: if self.lon.isscalar: lon_range = self.lon else: lon_range = u.Quantity([self.lon.min(), self.lon.max()]) else: lon_range = None if self._latitude_name: if self.lat.isscalar: lat_range = self.lat else: lat_range = u.Quantity([self.lat.min(), self.lat.max()]) else: lat_range = None if self._spectral_name: if self.spectral_axis.isscalar: spectral_range = self.spectral_axis else: spectral_range = u.Quantity([self.spectral_axis.min(), self.spectral_axis.max()]) else: spectral_range = None return (textwrap.dedent(f"""\ {self.__class__.__name__} {"".join(["-"] * len(self.__class__.__name__))} Time Period: {time_period} Instrument axes: {self.instrument_axes} Pixel dimensions: {self.dimensions.astype(int)} Longitude range: {lon_range} Latitude range: {lat_range} Spectral range: {spectral_range} Data unit: {self.unit}""")) def __repr__(self): return f"{object.__repr__(self)}\n{str(self)}" def __getitem__(self, item): # Slice SpectrogramCube using parent slicing. result = super().__getitem__(item) # As result is an NDCube, must put extra coord back into input format # to create a SpectrogramCube. if result.extra_coords is None: extra_coords = None else: extra_coords = convert_extra_coords_dict_to_input_format(result.extra_coords, result.missing_axes) # Slice instrument_axes if it exists. # If item is a slice, cube dimensionality is not reduced # so instrument_axes need not be sliced. if self.instrument_axes is None or isinstance(item, slice): instrument_axes = self.instrument_axes else: # If item is int, instrument_axes needs slicing. if isinstance(item, numbers.Integral): instrument_axes = self.instrument_axes[1:] # If item is tuple, instrument axes will need to be sliced if tuple contains an int. elif isinstance(item, tuple): instr_item = [isinstance(i, numbers.Integral) for i in item] + \ [False] * (len(self.instrument_axes) - len(item)) instrument_axes = self.instrument_axes[np.invert(instr_item)] else: raise TypeError("Unrecognized slice item. Must be int, slice or tuple.") # If slicing causes cube to be a scalar, set instrument_axes to None. if len(instrument_axes) == 0: instrument_axes = None return self.__class__(result.data, wcs=result.wcs, uncertainty=result.uncertainty, mask=result.mask, meta=result.meta, unit=result.unit, extra_coords=extra_coords, missing_axes=result.missing_axes, instrument_axes=instrument_axes) @property def spectral_axis(self): if not self._spectral_name: raise ValueError("Spectral" + AXIS_NOT_FOUND_ERROR + f"{SUPPORTED_SPECTRAL_NAMES}") return self._get_axis_coord(self._spectral_name, self._spectral_loc) @property def time(self): if not self._time_name: raise ValueError("Time" + AXIS_NOT_FOUND_ERROR + f"{SUPPORTED_TIME_NAMES}") times = self._get_axis_coord(self._time_name, self._time_loc) if isinstance(times, (u.Quantity, TimeDelta)): try: if self.meta.date_reference: times += self.meta.date_reference except AttributeError: pass return times @property def exposure_time(self): if not self._exposure_time_name: raise ValueError("Exposure time" + AXIS_NOT_FOUND_ERROR + f"{SUPPORTED_EXPOSURE_NAMES}") return self._get_axis_coord(self._exposure_time_name, self._exposure_time_loc) @property def lon(self): if not self._longitude_name: raise ValueError("Longitude" + AXIS_NOT_FOUND_ERROR + f"{SUPPORTED_LONGITUDE_NAMES}") return self._get_axis_coord(self._longitude_name, self._longitude_loc) @property def lat(self): if not self._latitude_name: raise ValueError("Latitude" + AXIS_NOT_FOUND_ERROR + f"{SUPPORTED_LATITUDE_NAMES}") return self._get_axis_coord(self._latitude_name, self._latitude_loc) def apply_exposure_time_correction(self, undo=False, force=False): # Get exposure time in seconds. exposure_time_s = self.exposure_time.to(u.s).value # If exposure time is not scalar, change array's shape so that # it can be broadcast with data and uncertainty arrays. if not np.isscalar(exposure_time_s): # Get data axis of exposure time. if self._exposure_time_loc == "extra_coords": exposure_axis = self.extra_coords[self._exposure_time_name]["axis"] else: exposure_axis = get_axis_number_from_axis_name(self._exposure_time_name, self.world_axis_physical_types) # Change array shape for broadcasting item = [np.newaxis] * self.data.ndim item[exposure_axis] = slice(None) exposure_time_s = exposure_time_s[tuple(item)] # Based on value on undo kwarg, apply or remove exposure time correction. if undo is True: new_data, new_uncertainty, new_unit = _uncalculate_exposure_time_correction( self.data, self.uncertainty, self.unit, exposure_time_s, force=force) else: new_data, new_uncertainty, new_unit = _calculate_exposure_time_correction( self.data, self.uncertainty, self.unit, exposure_time_s, force=force) # Return new instance of SpectrogramCube with correction applied/undone. return self.__class__( new_data, wcs=self.wcs, extra_coords=convert_extra_coords_dict_to_input_format(self.extra_coords, self.missing_axes), unit=new_unit, uncertainty=new_uncertainty, meta=self.meta, mask=self.mask, missing_axes=self.missing_axes) def _get_axis_coord(self, axis_name, coord_loc): if coord_loc == "wcs": return self.axis_world_coords_values(axis_name)[0] elif coord_loc == "extra_coords": return self.extra_coords[axis_name]["value"]
def _find_axis_name(supported_names, world_axis_physical_types, extra_coords): """ Finds name of a SpectrogramCube axis type from WCS and extra coords. Parameters ---------- supported_names: 1D `numpy.ndarray` The names for the axis supported by `SpectrogramCube`. world_axis_physical_types: 1D `numpy.ndarray` Output of SpectrogramCube.world_axis_physical_types converted to an array. extra_coords: `dict` or `None` Output of SpectrogramCube.extra_coords Returns ------- axis_name: `str` The coordinate name of the axis. loc: `str` The location where the coordinate is stored: "wcs" or "extra_coords". """ axis_name = None loc = None # Check WCS for axis name. axis_name = _find_name_in_array(supported_names, world_axis_physical_types) if axis_name: loc = "wcs" elif extra_coords: # If axis name not in WCS, check extra_coords. axis_name = _find_name_in_array(supported_names, np.array(list(extra_coords.keys()))) if axis_name: loc = "extra_coords" return axis_name, loc def _find_name_in_array(supported_names, names_array): name_index = np.isin(names_array, supported_names) if name_index.sum() > 0: name_index = int(np.arange(len(names_array))[name_index]) return names_array[name_index] def _calculate_exposure_time_correction(data, uncertainty, unit, exposure_time, force=False): """ Applies exposure time correction to data and uncertainty arrays. Parameters ---------- data: `numpy.ndarray` Data array to be converted. uncertainty: `astropy.nddata.nduncertainty.NDUncertainty` The uncertainty of each element in data. old_unit: `astropy.unit.Unit` Unit of data arrays. exposure_time: `numpy.ndarray` Exposure time in seconds for each exposure in data arrays. Returns ------- new_data: `numpy.ndarray` Data array with exposure time corrected for. new_uncertainty: `astropy.nddata.nduncertainty.NDUncertainty` The uncertainty of each element in new_data. new_unit: `astropy.unit.Unit` Unit of new_data array after exposure time correction. """ # If force is not set to True and unit already includes inverse time, # raise error as exposure time correction has probably already been # applied and should not be applied again. if force is not True and u.s in unit.decompose().bases: raise ValueError(APPLY_EXPOSURE_TIME_ERROR) else: # Else, either unit does not include inverse time and so # exposure does need to be applied, or # user has set force=True and wants the correction applied # regardless of the unit. new_data = data / exposure_time if uncertainty: uncertainty_unit = uncertainty.unit / u.s if uncertainty.unit else uncertainty.unit new_uncertainty = uncertainty.__class__(uncertainty.array / exposure_time, unit=uncertainty_unit) else: new_uncertainty = uncertainty new_unit = unit / u.s return new_data, new_uncertainty, new_unit def _uncalculate_exposure_time_correction(data, uncertainty, unit, exposure_time, force=False): """ Removes exposure time correction from data and uncertainty arrays. Parameters ---------- data: `numpy.ndarray` Data array to be converted. uncertainty: `astropy.nddata.nduncertainty.NDUncertainty` The uncertainty of each element in data. old_unit: `astropy.unit.Unit` Unit of data arrays. exposure_time: `numpy.ndarray` Exposure time in seconds for each exposure in data arrays. Returns ------- new_data: `numpy.ndarray` Data array with exposure time corrected for. new_uncertainty: `astropy.nddata.nduncertainty.NDUncertainty` The uncertainty of each element in new_data. new_unit: `astropy.unit.Unit` Unit of new_data array after exposure time correction. """ # If force is not set to True and unit does not include inverse time, # raise error as exposure time correction has probably already been # undone and should not be undone again. if force is not True and u.s in (unit * u.s).decompose().bases: raise ValueError(UNDO_EXPOSURE_TIME_ERROR) else: # Else, either unit does include inverse time and so # exposure does need to be removed, or # user has set force=True and wants the correction removed # regardless of the unit. new_data = data * exposure_time if uncertainty: uncertainty_unit = uncertainty.unit * u.s if uncertainty.unit else uncertainty.unit new_uncertainty = uncertainty.__class__(uncertainty.array * exposure_time, unit=uncertainty_unit) else: new_uncertainty = uncertainty new_unit = unit * u.s return new_data, new_uncertainty, new_unit