Source code for sherpa.astro.utils

#
#  Copyright (C) 2008, 2016, 2020 - 2022, 2024
#  Smithsonian Astrophysical Observatory
#
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
"""Utility routines for the astronomy code.

.. versionchanged:: 4.17.0
   The resp_init routine has been removed.

"""

from typing import Optional

import numpy as np

from sherpa.astro import hc, charge_e
from sherpa.utils import filter_bins
from sherpa.utils.err import IOErr, DataErr
from sherpa.utils import guess
from sherpa.utils.guess import ValueAndRange, get_position

from ._utils import arf_fold, do_group, expand_grouped_mask, \
    filter_resp, is_in, rmf_fold, shrink_effarea
from ._pileup import apply_pileup


__all__ = ('arf_fold', 'rmf_fold', 'do_group', 'apply_pileup',
           'eqwidth', 'calc_photon_flux', 'calc_energy_flux',
           'calc_data_sum', 'calc_model_sum', 'shrink_effarea',
           'calc_data_sum2d', 'calc_model_sum2d', 'filter_resp',
           'calc_source_sum', 'compile_energy_grid',
           'calc_kcorr',
           'expand_grouped_mask', 'is_in',
           'get_xspec_position', 'get_xspec_norm')



def reshape_2d_arrays(x0, x1):
    new_x0, new_x1 = np.meshgrid(x0, x1)
    return new_x0.ravel(), new_x1.ravel()


[docs] def get_xspec_position(y: np.ndarray, x: np.ndarray, xhi: Optional[np.ndarray] = None ) -> ValueAndRange: """Estimate a position for an XSPEC model.""" if xhi is not None: if x[0] > x[-1] and xhi[0] > xhi[-1]: lo = hc / xhi hi = hc / x x, xhi = lo, hi else: if x[0] > x[-1]: x = hc / x return get_position(y, x, xhi)
[docs] def get_xspec_norm(y: np.ndarray, mdl: np.ndarray) -> ValueAndRange: """Guess model normalization (XSPEC). Estimate the normalization based on the data and model. Parameters ---------- y: ndarray The data mdl: ndarray The model evaluation on the same grid. """ # Use the summation of the data as the comparison rather than pick # an individual value. This is an assumption as to it being a # useful approach, and we may decide to change the approach. # sum_dep = np.sum(y) sum_mdl = np.sum(mdl) r = sum_dep / sum_mdl return {'val': r, 'min': r / guess._guess_ampl_scale, 'max': r * guess._guess_ampl_scale}
[docs] def compile_energy_grid(arglist): '''Combine several grids (energy, channel, wavelength) into one. This function combines several grids into one, such that the model does not have to be evaluated several times, just because the same energy point occurs in every grid. This function works under the assumption that evaluating the model is expensive and it is is worthwhile to do some bookkeeping to be able to derive the model for all intervals needed with the minimum number of model points. Parameters ---------- arglist : list of tuples The list contains the input energy grids. Each tuple is a pair of arrays specifying the lower and upper limits for the bins in each input grid. Returns ------- out : list The elements of the list are: elo : array of lower bin values; ehi : array of upper bin values; and htable : list of tuples in the same format as the input. The entries in htable are indices into elo and ehi that return the original input arrays. Examples -------- >>> compile_energy_grid([([1,3,5], [3, 5, 7]), ([0, 1, 2], [1, 2, 3])]) [array([0, 1, 2, 3, 5]), array([1, 2, 3, 5, 7]), [(array([1, 3, 4], dtype=int32), array([2, 3, 4], dtype=int32)), (array([0, 1, 2], dtype=int32), array([0, 1, 2], dtype=int32))]] ''' elo = np.unique(np.concatenate([indep[0] for indep in arglist])) ehi = np.unique(np.concatenate([indep[1] for indep in arglist])) in_elo = np.setdiff1d(elo, ehi) in_ehi = np.setdiff1d(ehi, elo) if len(in_elo) > 1: ehi = np.concatenate((ehi, in_elo[1:])) ehi.sort() if len(in_ehi) > 1: elo = np.concatenate((elo, in_ehi[:- 1])) elo.sort() # determine index intervals using binary search in large src model # for n_th ARF energy bounds and populate a table to use later for # integration over said intervals htable = [(np.searchsorted(elo, arg[0]).astype(np.int32), np.searchsorted(ehi, arg[1]).astype(np.int32)) for arg in arglist] return [elo, ehi, htable]
def bounds_check(lo, hi): """Ensure that the limits of a filter make sense. Note that if only one of them is given we always return that value first. """ # TODO: this error message is not ideal, as it assumes the # units are energy when the values could be wavelengths, # channels, or for a non PHA dataset. See issue #1443 # if lo is not None and hi is not None and lo > hi: raise IOErr('boundscheck', lo, hi) if lo is not None and hi is None: hi = lo if hi is not None and lo is None: lo = hi return (lo, hi) def range_overlap_1dint(axislist, lo, hi): """Return 'overlap' fraction of a 1D int. grid for a range. Parameters ---------- axislist : sequence of array, array The low and high edges of the bins (in that order). The two arrays must have the same size, in either ascending or descending order (the same for both), and have no gaps. The bins are inclusive for the low edge and exclusive for the upper edge. lo, hi : None or number The range bounds; either both are None or both are set with hi >= lo. Returns ------- scale : None or numpy array An array indicating how much overlap there is between the lo-hi grid and each bin (on a scale of 0 to 1). There is one wrinkle: if lo = hi then the scale is set to 1 for that bin. If there is no match then None is returned. Notes ----- The bins are assumed to be inclusive for the lower edge and exclusive for the higher edge. This means that if a flux density is requested (lo=hi) and it falls on an edge then the bin returned is the one starting at the requested value. The consequences are that if the grid starts at gmin and ends at gmax (low edge of first bin and high edge of last bin) then lo=hi=gmin will return [1, 0, ...] and lo=hi=gmax returns None. As a special case, when lo < hi (so a range is requested) and hi=gmin, None is returned (rather than an array of 0's). Comparison to the bin edges is done using the standard NumPy comparison orders (since it is done by the numpy.digitize routine). Given that RMF response grids (which are what this is going to be used with) have "interesting" values, this can cause surprising issues - for example, the test 3c273.pi dataset has an RMF that starts at 0.10000000149011612 rather than 0.1 keV, so a flux density calculated at 0.1 keV will return 0. An alternative aprproach is to compare to bin edges using sao_fcmp, but that would complicate the code. """ # TODO: is compile_energy_grid relevant to this at all? # A number of asserts are used to ensure invariants are actually # true. They could be removed once this has shown to be # working. # axislo = axislist[0] axishi = axislist[1] nbins = axislo.size assert nbins == axishi.size assert axishi[0] > axislo[0] if lo is None and hi is None: return np.ones(axislo.size) assert lo is not None assert hi is not None density = lo == hi # To simplify the following, we require that the grid be # in ascending order. # ascending = axislo[1] > axislo[0] if ascending: edges = np.append(axislo, axishi[-1]) else: edges = np.append(axishi[0], axislo)[::-1] axmin = edges[0] axmax = edges[-1] assert axmin < axmax, (axmin, axmax) # Some of these special cases could be checked before # creating edges, but leave here for now. # if lo >= axmax or hi < axmin: return None if lo <= axmin and hi >= axmax: return np.ones(nbins) # special case handling of hi == axmin but lo != hi # if hi == axmin and lo < hi: return None # At this point there should be no cases where lo=hi # and lo < axmin or >= axmax. This means we can replace # lo and hi by axmin and axmax (if they exceed the limits), # as that makes code below easier. # if density: assert lo >= axmin and lo < axmax lo = max(lo, axmin) hi = min(hi, axmax) # Find the bins corresponding to the start and end # points. See the digitize documentation for the # meaning of the return values. # bins = np.digitize([lo, hi], edges, right=False) blo = bins[0] bhi = bins[1] # since lo has been set to a minimum of the lower edge, # blo should always be >= 1. The upper limit can be # > nbins when it is equal to the upper limit. # assert blo > 0, blo # assert bhi <= nbins, (bhi, nbins) scale = np.zeros(nbins) ilo = blo - 1 if density: assert blo == bhi scale[ilo] = 1 return scale if ascending else scale[::-1] if blo == bhi: # a single bin scale[ilo] = (hi - lo) / (edges[blo] - edges[ilo]) return scale if ascending else scale[::-1] # Fully included ihi = bhi - 1 scale[blo:ihi] = 1.0 # Low edge (may be fully included) assert lo >= edges[ilo], (lo, edges[ilo]) assert lo <= edges[blo], (lo, edges[blo]) scale[ilo] = (edges[blo] - lo) / (edges[blo] - edges[ilo]) # High edge (may be fully included) if bhi <= nbins: assert hi >= edges[ihi], (hi, edges[ihi]) assert hi <= edges[bhi], (hi, edges[bhi]) scale[ihi] = (hi - edges[ihi]) / (edges[bhi] - edges[ihi]) return scale if ascending else scale[::-1] def _flux(data, lo, hi, src, eflux=False, srcflux=False): if data.ndim != 1: raise DataErr("wrongdim", data.name, 1) lo, hi = bounds_check(lo, hi) try: method = data._get_indep except AttributeError: method = data.get_indep axislist = method(filter=False) dim = np.asarray(axislist).squeeze().ndim if dim > 2: raise IOErr('>axes', "2") # assume this should not happen, so we do not have to worry # about a nice error message assert dim > 0 # To make things simpler, evaluate on the full grid y = src(*axislist) if srcflux and dim == 2: y /= np.asarray(axislist[1] - axislist[0]) if eflux: # for energy flux, the sum of grid below must be in keV. # energ = [] convert = hasattr(data, 'units') and data.units == 'wavelength' for axis in axislist: grid = axis if convert: grid = hc / grid energ.append(grid) if dim == 2: ecorr = 0.5 * (energ[0] + energ[1]) else: # why multiply by 0.5? ecorr = 0.5 * energ[0] y *= ecorr # What bins do we use for the calculation? Linear interpolation # is used for bin edges (for integrated data sets) # if dim == 1: mask = filter_bins((lo,), (hi,), (axislist[0],)) assert mask is not None # no bin found if np.all(~mask): return 0.0 # convert boolean to numbers scale = 1.0 * mask else: scale = range_overlap_1dint(axislist, lo, hi) if scale is None: return 0.0 assert scale.max() > 0 # Originally a flux density was calculated if both lo and hi # fell in the same bin, but this has been changed so that # we only calculate a density if the lo and hi values are the # same (which is set by bounds_check when a density is requested). # if lo is not None and dim == 2 and lo == hi: assert scale.sum() == 1, f'programmer error: sum={scale.sum()}' y /= np.abs(axislist[1] - axislist[0]) flux = (scale * y).sum() if eflux: flux *= charge_e return flux def _counts(data, lo, hi, func, *args): """Sum up the data for the range lo to hi. The routine starts by saving the current filter, which is a combination of the mask field and - if set, which it is only for PHA data with ignore_bad called - the quality_filter field. The existing noticed range of the data set is ignored, and instead the range lo-hi is used (the meaning is slightly different to the notice method when one of the two fields is None but the other is set; see bounds_check). At this point the func method is called, with the remaining arguments. It is assumed that this is a method of the data object, but there is no requirement. The return value is summed up and this value will be returned by the routine. It is then time to restore the original filter, which is done in a finally block so that any errors calling func or the notice calls do not change the data object. The use of the name counts is a bit of a mis-nomer as this could be used on non-PHA data, but the user only sees this via calc_data_sum/calc_model_sum. """ if data.ndim != 1: raise DataErr("wrongdim", data.name, 1) lo, hi = bounds_check(lo, hi) old_mask = data.mask old_quality_filter = getattr(data, 'quality_filter', None) try: data.notice() # clear filter data.notice(lo, hi) counts = func(*args).sum() finally: # restore the old filter data.mask = old_mask if old_quality_filter is not None: data.quality_filter = old_quality_filter return counts def _counts2d(data, reg, func, *args): """Sum up the data for the given region. This is the same as _counts but for a data object that supports the notice2d method (so currently DataIMG and DataIMGInt). It has the same structure as _counts except that the fields used to save and restore the state of the data object match the DataIMG interface (i.e. mask and optional _region rather than mask and an optional quality_filter) The use of the name counts is a bit of a mis-nomer as this could be used on non-PHA data, but the user only sees this via calc_data_sum2d/calc_model_sum2d. """ if data.ndim != 2: raise DataErr("wrongdim", data.name, 2) old_mask = data.mask old_region = getattr(data, '_region', None) try: data.notice2d() # save and clear filter data.notice2d(reg) counts = func(*args).sum() finally: # restore the old filter data.mask = old_mask if old_region is not None: data._region = old_region return counts
[docs] def calc_energy_flux(data, src, lo=None, hi=None): """Integrate the source model over a pass band. Calculate the integral of E * S(E) over a pass band, where E is the energy of the bin and S(E) the spectral model evaluated for that bin. Parameters ---------- data The data object to use. src The source expression: this should not include any instrument responses. lo, hi : number, optional If both are None or both are set then calculate the flux over the given band. If only one is set then calculate the flux density at that point. The units for `lo` and `hi` are given by the current analysis setting of the `data` parameter. Returns ------- flux The flux or flux density of the source model. For X-Spec models the flux units will be erg/cm^2/s and the flux density is either erg/cm^2/s/keV or erg/cm^2/s/Angstrom, depending on the analysis setting. See Also -------- calc_data_sum : Sum up the data values over a pass band. calc_model_sum : Sum up the fitted model over a pass band. calc_source_sum : Sum up the source model over a pass band. calc_photon_flux : Integrate the source model over a pass band. Notes ----- The units of `lo` and `hi` are determined by the analysis setting for the data set (e.g. `data.get_analysis`). Any existing filter on the data set is ignored by this function. The flux is calculated from the given source model, so if it includes an absorbing component then the result will represent the absorbed flux. The absorbing component can be removed, or set to absorb no photons, to get the un-absorbed flux. The units of the answer depend on the model components used in the source expression and the axis or axes of the data set. It is unlikely to give sensible results for 2D data sets. Examples -------- Calculate the energy flux over the ranges 0.5 to 2 and 0.5 to 7 keV: >>> data.set_analysis('energy') >>> calc_energy_flux(data, smodel, 0.5, 2) 5.7224906878061796e-10 >>> calc_energy_flux(data, smodel, 0.5, 7) 1.3758131915063825e-09 Calculate the energy flux density at 0.5 keV: >>> calc_energy_flux(data, smodel, 0.5) 5.2573786652855304e-10 """ return _flux(data, lo, hi, src, eflux=True)
[docs] def calc_photon_flux(data, src, lo=None, hi=None): """Integrate the source model over a pass band. Calculate the integral of S(E) over a pass band, where S(E) is the spectral model evaluated for each bin. Parameters ---------- data The data object to use. src The source expression: this should not include any instrument responses. lo, hi : number, optional If both are None or both are set then calculate the flux over the given band. If only one is set then calculate the flux density at that point. The units for `lo` and `hi` are given by the current analysis setting of the `data` parameter. Returns ------- flux The flux or flux density of the source model. For X-Spec models the flux units will be photon/cm^2/s and the flux density is either photon/cm^2/s/keV or photon/cm^2/s/Angstrom, depending on the analysis setting. See Also -------- calc_data_sum : Sum up the data values over a pass band. calc_model_sum : Sum up the fitted model over a pass band. calc_energy_flux : Integrate the source model over a pass band. calc_source_sum : Sum up the source model over a pass band. Notes ----- The units of `lo` and `hi` are determined by the analysis setting for the data set (e.g. `data.get_analysis`). Any existing filter on the data set is ignored by this function. The flux is calculated from the given source model, so if it includes an absorbing component then the result will represent the absorbed flux. The absorbing component can be removed, or set to absorb no photons, to get the un-absorbed flux. The units of the answer depend on the model components used in the source expression and the axis or axes of the data set. It is unlikely to give sensible results for 2D data sets. Examples -------- Calculate the photon flux over the ranges 0.5 to 2 and 0.5 to 7 keV, and compared to the energy fluxes for the same bands: >>> data.set_analysis('energy') >>> calc_photon_flux(data, smodel, 0.5, 2) 0.35190275 >>> calc_photon_flux(data, smodel, 0.5, 7) 0.49050927 >>> calc_energy_flux(data, smodel, 0.5, 2) 5.7224906878061796e-10 >>> calc_energy_flux(data, smodel, 0.5, 7) 1.3758131915063825e-09 Calculate the photon flux density at 0.5 keV: >>> calc_photon_flux(data, smodel, 0.5) 0.64978176 """ return _flux(data, lo, hi, src)
# ## DOC-TODO: compare to calc_photon_flux ?
[docs] def calc_source_sum(data, src, lo=None, hi=None): """Sum up the source model over a pass band. Sum up S(E) over a pass band, where S(E) is the spectral model evaluated for each bin. Parameters ---------- data The data object to use. src The source expression. This must not include the instrumental responses. lo, hi : number, optional If both are None or both are set then sum up over the given band. If only one is set then use the model value in the selected bin. The units for `lo` and `hi` are given by the current analysis setting of the `data` object. Returns ------- signal : number The source model summed up over the given band or for a single bin. See Also -------- calc_data_sum : Sum up the observed counts over a pass band. calc_model_sum : Sum up the fitted model over a pass band. calc_energy_flux : Integrate the source model over a pass band. calc_photon_flux : Integrate the source model over a pass band. Notes ----- The units of `lo` and `hi` are determined by the analysis setting for the data set (e.g. `data.get_analysis`). Any existing filter on the data set - e.g. as created by `ignore` or `notice` - is ignored by this function. The units of the answer depend on the model components used in the source expression and the axis or axes of the data set. It is unlikely to give sensible results for 2D data sets. Examples -------- Sum up the model over the data range 0.5 to 2: >>> calc_source_sum(data, smodel, 0.5, 2) 139.12819041922018 """ return _flux(data, lo, hi, src, srcflux=True)
# def calc_source_sum2d( data, src, reg=None): # return _counts2d(data, reg, data.eval_model_to_fit, src)
[docs] def calc_data_sum(data, lo=None, hi=None): """Sum up the data values over a pass band. Parameters ---------- data The data object to use. lo, hi : number, optional If both are None or both are set then use the full dataset. If only one is set then use the data count for that bin. The units for `lo` and `hi` are given by the current analysis setting of the `data` parameter. Returns ------- dsum : number The sum of the data values that lie within the given limits. If a background estimate has been subtracted from the data set then the calculation will use the background-subtracted values. See Also -------- calc_data_sum2d : Sum up the data values of a 2D data set. calc_model_sum : Sum up the fitted model over a pass band. calc_energy_flux : Integrate the source model over a pass band. calc_photon_flux : Integrate the source model over a pass band. calc_source_sum : Sum up the source model over a pass band. set_model : Set the source model expression for a data set. Notes ----- The units of `lo` and `hi` are determined by the analysis setting for the data set (e.g. `data.get_analysis`). Any existing filter on the data set - e.g. as created by `ignore` or `notice` - is ignored by this function. If a grouping scheme has been applied to the data set that it will be used. This can change the results, since the first and last bins of the selected range may extend outside the requested range. Examples -------- Calculate the number of counts over the ranges 0.5 to 2 and 0.5 to 7 keV, first using the observed signal and then, for the 0.5 to 2 keV band - the background-subtraced estimate: >>> set_analysis('energy') >>> calc_data_sum(data, 0.5, 2) 745.0 >>> calc_data_sum(data, 0.5, 7) 60.0 >>> data.subtract() >>> calc_data_sum(data, 0.5, 2) 730.9179738207356 """ return _counts(data, lo, hi, data.apply_filter, data.get_dep())
[docs] def calc_data_sum2d(data, reg=None): """Sum up the data values of a 2D data set. Parameters ---------- data : sherpa.astro.data.DataIMG instance The data object to use. reg : str, optional The spatial filter to use. The default, ``None``, is to use the whole data set. Returns ------- dsum : number The sum of the data values that lie within the given region. See Also -------- calc_data_sum : Sum up the data values of a data set. calc_model_sum2d : Sum up the fitted model for a 2D data set. Notes ----- The coordinate system of the region filter is determined by the coordinate setting for the data set (e.g. `data.coord`). Any existing filter on the data set - e.g. as created by `ignore2d` or `notice2d` - is ignored by this function. """ return _counts2d(data, reg, data.apply_filter, data.get_dep())
# ## DOC-TODO: better comparison of calc_source_sum and calc_model_sum # ## needed (e.g. integration or results in PHA case?)
[docs] def calc_model_sum(data, model, lo=None, hi=None): """Sum up the fitted model over a pass band. Sum up S(E) over a pass band, where S(E) is the spectral model evaluated for each bin. Parameters ---------- data The data object to use. model The source expression, which should include the instrumental responses. lo, hi : number, optional If both are None or both are set then sum up over the given band. If only one is set then use the model value in the selected bin. The units for `lo` and `hi` are given by the current analysis setting of the `data` object. Returns ------- signal : number The source model summed up over the given band or for a single bin. See Also -------- calc_data_sum : Sum up the observed counts over a pass band. calc_energy_flux : Integrate the source model over a pass band. calc_photon_flux : Integrate the source model over a pass band. calc_source_sum : Sum up the source model over a pass band. Notes ----- The units of `lo` and `hi` are determined by the analysis setting for the data set (e.g. `data.get_analysis`). Any existing filter on the data set - e.g. as created by `ignore` or `notice` - is ignored by this function. The units of the answer depend on the model components used in the source expression and the axis or axes of the data set. It is unlikely to give sensible results for 2D data sets. """ return _counts(data, lo, hi, data.eval_model_to_fit, model)
# ## DOC-TODO: clean up whether the calc_model_* versions should or # ## should not contain the instrument response/PSF components. # ## Note: there is no calc_source_sum2d in this module, so # ## this needs looking at to see if the text is correct
[docs] def calc_model_sum2d(data, model, reg=None): """Sum up the fitted model for a 2D data set. Parameters ---------- data : sherpa.astro.data.DataIMG instance The data object to use. model The source expression, which should not include the PSF model. reg : str, optional The spatial filter to use. The default, ``None``, is to use the whole data set. Returns ------- dsum : number The sum of the model values, including any PSF convolution, that lie within the given region. See Also -------- calc_data_sum2d : Sum up the data values of a 2D data set. calc_model_sum : Sum up the fitted model over a pass band. Notes ----- The coordinate system of the region filter is determined by the coordinate setting for the data set (e.g. `data.coord`). Any existing filter on the data set - e.g. as created by `ignore2d` or `notice2d` - is ignored by this function. """ return _counts2d(data, reg, data.eval_model_to_fit, model)
[docs] def eqwidth(data, model, combo, lo=None, hi=None): lo, hi = bounds_check(lo, hi) my = None cy = None xlo = None xhi = None num = None eqw = 0.0 if hasattr(data, 'get_response'): xlo, xhi = data._get_indep(filter=False) my = model(xlo, xhi) cy = combo(xlo, xhi) num = len(xlo) else: my = data.eval_model_to_fit(model) cy = data.eval_model_to_fit(combo) xlo = data.get_indep(filter=True)[0] num = len(xlo) # TODO: should this follow _flux and handle the case when # we have xlo, xhi differently? # mask = filter_bins((lo,), (hi,), (xlo,)) if mask is not None: my = my[mask] cy = cy[mask] xlo = xlo[mask] num = len(xlo) for ebin, val in enumerate(xlo): if ebin < (num - 1): eave = np.abs(xlo[ebin + 1] - xlo[ebin]) else: eave = np.abs(xlo[ebin - 1] - xlo[ebin]) if my[ebin] != 0.0: eqw += eave * (cy[ebin] - my[ebin]) / my[ebin] return eqw
[docs] def calc_kcorr(data, model, z, obslo, obshi, restlo=None, resthi=None): """Calculate the K correction for a model. The K correction ([1]_, [2]_, [3]_, [4]_) is the numeric factor applied to measured energy fluxes to convert values in an observed energy band to that they are in a rest-frame energy band (that is, correct for the change in spectral shape between the rest-frame and observed-frame bands). This is often used when converting a flux into a luminosity. Parameters ---------- data The data object to use. model The source expression: this should not include any instrument responses. z : number or array, >= 0 The redshift, or redshifts, of the source. obslo : number The minimum energy of the observed band. obshi : number The maximum energy of the observed band, which must be larger than `obslo`. restlo : number or ``None`` The minimum energy of the rest-frame band. If ``None`` then use `obslo`. restlo : number or ``None`` The maximum energy of the rest-frame band. It must be larger than `restlo`. If ``None`` then use `obshi`. Returns ------- kz : number or array of numbers Notes ----- This is only defined when the analysis is in 'energy' units. If the model contains a redshift parameter then it should be set to ``0``, rather than the source redshift. If the source model is at zero redshift, the observed energy band is olo to ohi, and the rest frame band is rlo to rhi (which need not match the observed band), then the K correction at a redshift z can be calculated as:: frest = calc_energy_flux(data, model, rlo, rhi) fobs = calc_energy_flux(data, model, olo*(1+z), ohi*(1+z)) kz = frest / fobs The energy ranges used - rlo to rhi and olo*(1+z) to ohi*(1+z) - should be fully covered by the data grid, otherwise the flux calculation will be truncated at the grid boundaries, leading to incorrect results. References ---------- .. [1] "The K correction", Hogg, D.W., et al. http://arxiv.org/abs/astro-ph/0210394 .. [2] Appendix B of Jones et al. 1998, ApJ, vol 495, p. 100-114. http://adsabs.harvard.edu/abs/1998ApJ...495..100J .. [3] "K and evolutionary corrections from UV to IR", Poggianti, B.M., A&AS, 1997, vol 122, p. 399-407. http://adsabs.harvard.edu/abs/1997A%26AS..122..399P .. [4] "Galactic evolution and cosmology - Probing the cosmological deceleration parameter", Yoshii, Y. & Takahara, F., ApJ, 1988, vol 326, p. 1-18. http://adsabs.harvard.edu/abs/1988ApJ...326....1Y """ if restlo is None: restlo = obslo if resthi is None: resthi = obshi if np.isscalar(z): z = np.array([z], dtype=float) else: z = np.asarray(z) if 0 != sum(z[z < 0]): raise IOErr('z<=0') if obslo <= 0 or restlo <= 0 or obshi <= obslo or resthi <= restlo: raise IOErr('erange') if hasattr(data, 'get_response'): arf, rmf = data.get_response() elo = data.bin_lo ehi = data.bin_hi if arf is not None: elo = arf.energ_lo ehi = arf.energ_hi elif rmf is not None: elo = rmf.energ_lo ehi = rmf.energ_hi else: elo, ehi = data.get_indep() if elo is None or ehi is None: raise DataErr('noenergybins', data.name) emin = elo[0] emax = ehi[-1] if restlo < emin or resthi > emax: raise IOErr('energoverlap', emin, emax, 'rest-frame', restlo, resthi, '') if obslo * (1.0 + z.min()) < emin: raise IOErr('energoverlap', emin, emax, 'observed-frame', restlo, resthi, f"at a redshift of {z.min():f}") if obshi * (1.0 + z.max()) > emax: raise IOErr('energoverlap', emin, emax, 'rest-frame', restlo, resthi, f"at a redshift of {z.max():f}") zplus1 = z + 1.0 flux_rest = _flux(data, restlo, resthi, model, eflux=True) obs = np.asarray([_flux(data, obslo * zz, obshi * zz, model, eflux=True) for zz in zplus1], dtype=float) kcorr = flux_rest / obs if len(kcorr) == 1: return kcorr[0] return kcorr