Source code for sherpa.astro.instrument

#
#  Copyright (C) 2010, 2015, 2023, 2024
#  Smithsonian Astrophysical Observator
#
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

"""Models of common Astronomical models, particularly in X-rays.

The models in this module include support for instrument models that
describe how X-ray photons are converted to measurable properties,
such as Pulse-Height Amplitudes (PHA) or Pulse-Invariant channels.
These 'responses' are assumed to follow OGIP standards, such as `OGIP
Calibration Memo CAL/GEN/92-002, "The Calibration Requirements for
Spectral Analysis (Definition of RMF and ARF file formats)", Ian
M. George, Keith A. Arnaud, Bill Pence, Laddawan Ruamsuwan and
Michael F. Corcoran
<https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html>`_.

"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional, Union
import os

import numpy

import sherpa
from sherpa.utils.err import InstrumentErr, DataErr, PSFErr
from sherpa.models.model import ArithmeticModel, CompositeModel, Model

from sherpa.instrument import PSFModel as _PSFModel
from sherpa.utils import NoNewAttributesAfterInit
from sherpa.data import Data1D
from sherpa.astro import hc
from sherpa.astro.data import DataARF, DataRMF, _notice_resp, DataIMG
from sherpa.astro import io
from sherpa.utils import sao_fcmp, sum_intervals, sao_arange
from sherpa.astro.utils import compile_energy_grid
from sherpa.models.regrid import EvaluationSpace1D

WCS: Optional[type["sherpa.astro.io.wcs.WCS"]] = None
try:
    from sherpa.astro.io.wcs import WCS
except ImportError:
    WCS = None

_tol = numpy.finfo(numpy.float32).eps

string_types = (str, )


__all__ = ('RMFModel', 'ARFModel', 'RSPModel', 'RMFModelPHA',
           'RMFModelNoPHA', 'ARFModelPHA', 'ARFModelNoPHA',
           'RSPModelPHA', 'RSPModelNoPHA', 'MultiResponseSumModel',
           'PileupRMFModel', 'RMF1D', 'ARF1D', 'Response1D',
           'MultipleResponse1D', 'PileupResponse1D', 'PSFModel',
           'RMFMatrix', 'create_arf', 'create_delta_rmf',
           'create_non_delta_rmf', 'rmf_to_matrix', 'rmf_to_image' )


def apply_areascal(mdl, pha, instlabel):
    """Apply the AREASCAL conversion.

    This should be done after applying any RMF or ARF.

    Parameters
    ----------
    mdl : array
        The model values, after being passed through the response.
        The assumption is that the output is in channel space. No
        filtering is assumed to have been applied.
    pha : sherpa.astro.data.DataPHA object
        The PHA object containing the AREASCAL column, scalar, or
        None value.
    instlabel : str
        The name of the response (expected to be of the form
        'RMF: filename'). This is only used in case the size of out
        does not match the AREASCAL vector.

    Returns
    -------
    ans : array
        If AREASCAL is defined then the output is mdl * AREASCAL,
        otherwise it is just the input array (i.e. mdl).
    """

    ascal = pha.areascal
    if ascal is None:
        return mdl

    if numpy.iterable(ascal) and len(ascal) != len(mdl):
        raise DataErr('mismatch', instlabel,
                      f'AREASCAL: {pha.name}')

    return mdl * ascal


[docs] class RMFModel(CompositeModel, ArithmeticModel): """Base class for expressing RMF convolution in model expressions. """ def __init__(self, rmf, model): self.rmf = rmf self.model = model # Logic for ArithmeticModel.__init__ self._pars = () # FIXME: group pairs of coordinates with one attribute self.elo = None self.ehi = None # Energy space self.lo = None self.hi = None # Wavelength space self.xlo = None self.xhi = None # Current Spectral coordinates # Used to rebin against finer or coarser energy grids self.rmfargs = () CompositeModel.__init__(self, f'apply_rmf({model.name})', (model,)) self.filter()
[docs] def filter(self): # Energy grid (keV) self.elo, self.ehi = self.rmf.get_indep() # Wavelength grid (angstroms) self.lo, self.hi = hc / self.ehi, hc / self.elo # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi # Used to rebin against finer or coarser energy grids self.rmfargs = ()
[docs] def startup(self, cache=False): self.model.startup(cache) CompositeModel.startup(self, cache)
[docs] def teardown(self): self.model.teardown() CompositeModel.teardown(self)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): raise NotImplementedError
[docs] class ARFModel(CompositeModel, ArithmeticModel): """Base class for expressing ARF convolution in model expressions. """ def __init__(self, arf, model): self.arf = arf self.model = model self.elo = None self.ehi = None # Energy space self.lo = None self.hi = None # Wavelength space self.xlo = None self.xhi = None # Current Spectral coordinates # Used to rebin against finer or coarser energy grids self.arfargs = () # Logic for ArithmeticModel.__init__ self._pars = () CompositeModel.__init__(self, f'apply_arf({model.name})', (model,)) self.filter()
[docs] def filter(self): # Energy grid (keV) self.elo, self.ehi = self.arf.get_indep() # Wavelength grid (angstroms) self.lo, self.hi = hc / self.ehi, hc / self.elo # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi # Used to rebin against finer or coarser energy grids self.arfargs = ()
[docs] def startup(self, cache=False): self.model.startup(cache) CompositeModel.startup(self, cache)
[docs] def teardown(self): self.model.teardown() CompositeModel.teardown(self)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): raise NotImplementedError
[docs] class RSPModel(CompositeModel, ArithmeticModel): """Base class for expressing RMF + ARF convolution in model expressions """ def __init__(self, arf, rmf, model): self.arf = arf self.rmf = rmf self.model = model self.elo = None self.ehi = None # Energy space self.lo = None self.hi = None # Wavelength space self.xlo = None self.xhi = None # Current Spectral coordinates # Used to rebin against finer or coarser energy grids self.rmfargs = () self.arfargs = () # Logic for ArithmeticModel.__init__ self._pars = () CompositeModel.__init__(self, f'apply_rmf(apply_arf({model.name}))', (model,)) self.filter()
[docs] def filter(self): # Energy grid (keV), ARF grid breaks tie self.elo, self.ehi = self.arf.get_indep() # Wavelength grid (angstroms) self.lo, self.hi = hc / self.ehi, hc / self.elo # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi # Used to rebin against finer or coarser energy grids self.rmfargs = () self.arfargs = ()
[docs] def startup(self, cache=False): self.model.startup(cache) CompositeModel.startup(self, cache)
[docs] def teardown(self): self.model.teardown() CompositeModel.teardown(self)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): raise NotImplementedError
[docs] class RMFModelPHA(RMFModel): """RMF convolution model with associated PHA data set. Notes ----- Scaling by the AREASCAL setting (scalar or array) is included in this model. """ def __init__(self, rmf, pha, model): self.pha = pha self._rmf = rmf # store a reference to original RMFModel.__init__(self, rmf, model)
[docs] def filter(self): RMFModel.filter(self) pha = self.pha # If PHA is a finer grid than RMF, evaluate model on PHA and # rebin down to the granularity that the RMF expects. if pha.bin_lo is not None and pha.bin_hi is not None: bin_lo, bin_hi = pha.bin_lo, pha.bin_hi # If PHA grid is in angstroms then convert to keV for # consistency if (bin_lo[0] > bin_lo[-1]) and (bin_hi[0] > bin_hi[-1]): bin_lo = hc / pha.bin_hi bin_hi = hc / pha.bin_lo # FIXME: What about filtered option?? bin_lo, bin_hi are # unfiltered?? # Compare disparate grids in energy space self.rmfargs = ((self.elo, self.ehi), (bin_lo, bin_hi)) # FIXME: Compute on finer energy grid? Assumes that PHA has # finer grid than RMF self.elo, self.ehi = bin_lo, bin_hi # Wavelength grid (angstroms) self.lo, self.hi = hc / self.ehi, hc / self.elo # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi if self.pha.units == 'wavelength': self.xlo, self.xhi = self.lo, self.hi
[docs] def startup(self, cache=False): rmf = self._rmf # original # Create a view of original RMF self.rmf = DataRMF(rmf.name, rmf.detchans, rmf.energ_lo, rmf.energ_hi, rmf.n_grp, rmf.f_chan, rmf.n_chan, rmf.matrix, rmf.offset, rmf.e_min, rmf.e_max, rmf.header) # Filter the view for current fitting session _notice_resp(self.pha.get_noticed_channels(), None, self.rmf) self.filter() # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi if self.pha.units == 'wavelength': self.xlo, self.xhi = self.lo, self.hi RMFModel.startup(self, cache)
[docs] def teardown(self): self.rmf = self._rmf self.filter() RMFModel.teardown(self)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): # x is noticed/full channels here src = self.model.calc(p, self.xlo, self.xhi) out = self.rmf.apply_rmf(src, *self.rmfargs) return apply_areascal(out, self.pha, f"RMF: {self.rmf.name}")
[docs] class RMFModelNoPHA(RMFModel): """RMF convolution model without an associated PHA data set. Notes ----- Since there is no PHA data set, there is no correction for any AREASCAL setting associated with the data. """ def __init__(self, rmf, model): RMFModel.__init__(self, rmf, model)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): # x is noticed/full channels here # Always evaluates source model in keV! src = self.model.calc(p, self.xlo, self.xhi) return self.rmf.apply_rmf(src)
[docs] class ARFModelPHA(ARFModel): """ARF convolution model with associated PHA data set. Notes ----- Scaling by the AREASCAL setting (scalar or array) is included in this model. It is not yet clear if this is handled correctly. """ def __init__(self, arf, pha, model): self.pha = pha self._arf = arf # store a reference to original ARFModel.__init__(self, arf, model)
[docs] def filter(self): ARFModel.filter(self) pha = self.pha # If PHA is a finer grid than ARF, evaluate model on PHA and # rebin down to the granularity that the ARF expects. if pha.bin_lo is not None and pha.bin_hi is not None: bin_lo, bin_hi = pha.bin_lo, pha.bin_hi # If PHA grid is in angstroms then convert to keV for # consistency if (bin_lo[0] > bin_lo[-1]) and (bin_hi[0] > bin_hi[-1]): bin_lo = hc / pha.bin_hi bin_hi = hc / pha.bin_lo # FIXME: What about filtered option?? bin_lo, bin_hi are # unfiltered?? # Compare disparate grids in energy space self.arfargs = ((self.elo, self.ehi), (bin_lo, bin_hi)) # FIXME: Assumes ARF grid is finest # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi if self.pha.units == 'wavelength': self.xlo, self.xhi = self.lo, self.hi
[docs] def startup(self, cache=False): arf = self._arf # original pha = self.pha # Create a view of original ARF self.arf = DataARF(arf.name, arf.energ_lo, arf.energ_hi, arf.specresp, arf.bin_lo, arf.bin_hi, arf.exposure, arf.header) # Filter the view for current fitting session if numpy.iterable(pha.mask): mask = pha.get_mask() if len(mask) == len(self.arf.specresp): self.arf.notice(mask) self.filter() # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi if pha.units == 'wavelength': self.xlo, self.xhi = self.lo, self.hi ARFModel.startup(self, cache)
[docs] def teardown(self): self.arf = self._arf # restore original self.filter() ARFModel.teardown(self)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): # x could be channels or x, xhi could be energy|wave src = self.model.calc(p, self.xlo, self.xhi) src = self.arf.apply_arf(src, *self.arfargs) return apply_areascal(src, self.pha, f"ARF: {self.arf.name}")
[docs] class ARFModelNoPHA(ARFModel): """ARF convolution model without associated PHA data set. Notes ----- Since there is no PHA data set, there is no correction for any AREASCAL setting associated with the data. """ def __init__(self, arf, model): ARFModel.__init__(self, arf, model)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): # x could be channels or x, xhi could be energy|wave # if (xhi is not None and # x[0] > x[-1] and xhi[0] > xhi[-1]): # xlo, xhi = self.lo, self.hi # else: # Always evaluates source model in keV! src = self.model.calc(p, self.xlo, self.xhi) return self.arf.apply_arf(src)
[docs] class RSPModelPHA(RSPModel): """RMF + ARF convolution model with associated PHA. Notes ----- Scaling by the AREASCAL setting (scalar or array) is included in this model. """ def __init__(self, arf, rmf, pha, model): self.pha = pha self._arf = arf self._rmf = rmf RSPModel.__init__(self, arf, rmf, model)
[docs] def filter(self): RSPModel.filter(self) pha = self.pha # If PHA is a finer grid than RMF, evaluate model on PHA and # rebin down to the granularity that the RMF expects. if pha.bin_lo is not None and pha.bin_hi is not None: bin_lo, bin_hi = pha.bin_lo, pha.bin_hi # If PHA grid is in angstroms then convert to keV for # consistency if (bin_lo[0] > bin_lo[-1]) and (bin_hi[0] > bin_hi[-1]): bin_lo = hc / pha.bin_hi bin_hi = hc / pha.bin_lo # FIXME: What about filtered option?? bin_lo, bin_hi are # unfiltered?? # Compare disparate grids in energy space self.arfargs = ((self.elo, self.ehi), (bin_lo, bin_hi)) # FIXME: Assumes ARF grid is finest elo, ehi = self.rmf.get_indep() # self.elo, self.ehi are from ARF if len(elo) != len(self.elo) and len(ehi) != len(self.ehi): self.rmfargs = ((elo, ehi), (self.elo, self.ehi)) # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi if self.pha.units == 'wavelength': self.xlo, self.xhi = self.lo, self.hi
[docs] def startup(self, cache=False): arf = self._arf rmf = self._rmf # Create a view of original RMF self.rmf = DataRMF(rmf.name, rmf.detchans, rmf.energ_lo, rmf.energ_hi, rmf.n_grp, rmf.f_chan, rmf.n_chan, rmf.matrix, rmf.offset, rmf.e_min, rmf.e_max, rmf.header) # Create a view of original ARF self.arf = DataARF(arf.name, arf.energ_lo, arf.energ_hi, arf.specresp, arf.bin_lo, arf.bin_hi, arf.exposure, arf.header) # Filter the view for current fitting session _notice_resp(self.pha.get_noticed_channels(), self.arf, self.rmf) self.filter() # Assume energy as default spectral coordinates self.xlo, self.xhi = self.elo, self.ehi if self.pha.units == 'wavelength': self.xlo, self.xhi = self.lo, self.hi RSPModel.startup(self, cache)
[docs] def teardown(self): self.arf = self._arf # restore originals self.rmf = self._rmf self.filter() RSPModel.teardown(self)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): # x could be channels or x, xhi could be energy|wave src = self.model.calc(p, self.xlo, self.xhi) src = self.arf.apply_arf(src, *self.arfargs) src = self.rmf.apply_rmf(src, *self.rmfargs) # Assume any issues with the binning (between AREASCAL # and src) is related to the RMF rather than the ARF. return apply_areascal(src, self.pha, f"RMF: {self.rmf.name}")
[docs] class RSPModelNoPHA(RSPModel): """RMF + ARF convolution model without associated PHA data set. Notes ----- Since there is no PHA data set, there is no correction for any AREASCAL setting associated with the data. """ def __init__(self, arf, rmf, model): RSPModel.__init__(self, arf, rmf, model)
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): # x could be channels or x, xhi could be energy|wave # Always evaluates source model in keV! src = self.model.calc(p, self.xlo, self.xhi) src = self.arf.apply_arf(src, *self.arfargs) return self.rmf.apply_rmf(src, *self.rmfargs)
[docs] class ARF1D(NoNewAttributesAfterInit): def __init__(self, arf, pha=None, rmf=None): self._arf = arf self._pha = pha NoNewAttributesAfterInit.__init__(self) def __getattr__(self, name): arf = None try: arf = ARF1D.__getattribute__(self, '_arf') except: pass if name in ('_arf', '_pha'): return self.__dict__[name] if arf is not None: return DataARF.__getattribute__(arf, name) return ARF1D.__getattribute__(self, name) def __setattr__(self, name, val): arf = None try: arf = ARF1D.__getattribute__(self, '_arf') except: pass if arf is not None and hasattr(arf, name): DataARF.__setattr__(arf, name, val) else: NoNewAttributesAfterInit.__setattr__(self, name, val) def __dir__(self): return dir(self._arf) def __str__(self): return str(self._arf) def __repr__(self): return repr(self._arf) def __call__(self, model, session=None): arf = self._arf pha = self._pha if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) # Automatically add exposure time to source model if pha is not None and pha.exposure is not None: model = pha.exposure * model elif arf.exposure is not None: model = arf.exposure * model # FIXME: display a warning if exposure is None? if pha is not None: return ARFModelPHA(arf, pha, model) return ARFModelNoPHA(arf, model)
[docs] class RMF1D(NoNewAttributesAfterInit): def __init__(self, rmf, pha=None, arf=None): self._rmf = rmf self._arf = arf self._pha = pha NoNewAttributesAfterInit.__init__(self) def __getattr__(self, name): rmf = None try: rmf = RMF1D.__getattribute__(self, '_rmf') except: pass if name in ('_rmf', '_pha'): return self.__dict__[name] if rmf is not None: return DataRMF.__getattribute__(rmf, name) return RMF1D.__getattribute__(self, name) def __setattr__(self, name, val): rmf = None try: rmf = RMF1D.__getattribute__(self, '_rmf') except: pass if rmf is not None and hasattr(rmf, name): DataRMF.__setattr__(rmf, name, val) else: NoNewAttributesAfterInit.__setattr__(self, name, val) def __dir__(self): return dir(self._rmf) def __str__(self): return str(self._rmf) def __repr__(self): return repr(self._rmf) def __call__(self, model, session=None): arf = self._arf rmf = self._rmf pha = self._pha if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) # Automatically add exposure time to source model for RMF-only analysis if type(model) not in (ARFModel, ARFModelPHA, ARFModelNoPHA): if pha is not None and pha.exposure is not None: model = pha.exposure * model elif arf is not None and arf.exposure is not None: model = arf.exposure * model elif pha is not None and arf is not None: # If model is an ARF? # Replace RMF(ARF(SRC)) with RSP(SRC) for efficiency return RSPModelPHA(arf, rmf, pha, model.model) if pha is not None: return RMFModelPHA(rmf, pha, model) return RMFModelNoPHA(rmf, model)
[docs] class Response1D(NoNewAttributesAfterInit): """A factory class for generating the instrument response. This should not be used when a pileup model is required. Parameters ---------- pha : sherpa.astro.data.DataPHA instance The data object which defines the channel grid and instrument response. There must be either an ARF or RMF associated with the dataset. Raises ------ sherpa.utils.err.DataErr The argument does not contain any response information (it is missing an ARF and a RMF). See Also -------- MultipleResponse1D, PileupResponse1D Notes ----- When the object is called it can be sent a ``session`` parameter, which defines the session to use when converting a string model to a ArithmeticModel instance. It is not used for any other function. The default value for this parameter is `None`, in which case the code uses the sherpa.astro.ui._session object. The response will include the exposure time if is is defined in either the PHA or ARF datasets (PHA taking precedence). The final response will be one of RSPModelPHA, ARFModelPHA, or RMFModelPHA. Examples -------- Add the response to a model (``src_model``) and then evaluate it. The response will ignore the input argument, and evaluate it for all channels (hence the use of a dummy argument [1]): >>> rsp = Response1D(pha) >>> full_model = rsp(src_model) >>> ycnts = full_model([1]) """ def __init__(self, pha): self.pha = pha arf, rmf = pha.get_response() if arf is None and rmf is None: raise DataErr('norsp', pha.name) NoNewAttributesAfterInit.__init__(self) def __call__(self, model, session=None): pha = self.pha arf, rmf = pha.get_response() if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) # Automatically add exposure time to source model if pha.exposure is not None: model = pha.exposure * model elif arf is not None and arf.exposure is not None: model = arf.exposure * model if arf is not None and rmf is not None: return RSPModelPHA(arf, rmf, pha, model) if arf is not None: return ARFModelPHA(arf, pha, model) if rmf is not None: return RMFModelPHA(rmf, pha, model) raise DataErr('norsp', pha.name)
class ResponseNestedModel(Model): def __init__(self, arf=None, rmf=None): self.arf = arf self.rmf = rmf name = '' if arf is not None and rmf is not None: name = 'apply_rmf(apply_arf(' elif arf is not None: name = 'apply_arf(' elif rmf is not None: name = 'apply_rmf(' Model.__init__(self, name) def calc(self, p, *args, **kwargs): arf = self.arf rmf = self.rmf if arf is not None and rmf is not None: return rmf.apply_rmf(arf.apply_arf(*args, **kwargs)) elif self.arf is not None: return arf.apply_arf(*args, **kwargs) return rmf.apply_rmf(*args, **kwargs)
[docs] class MultiResponseSumModel(CompositeModel, ArithmeticModel): def __init__(self, source, pha): self.channel = pha.channel self.mask = numpy.ones(len(pha.channel), dtype=bool) self.pha = pha self.source = source self.elo = None self.ehi = None self.lo = None self.hi = None self.table = None self.orders = None models = [] grid = [] for id in pha.response_ids: arf, rmf = pha.get_response(id) if arf is None and rmf is None: raise DataErr('norsp', pha.name) m = ResponseNestedModel(arf, rmf) indep = None if arf is not None: indep = arf.get_indep() if rmf is not None: indep = rmf.get_indep() models.append(m) grid.append(indep) self.models = models self.grid = grid expr = ','.join([f'{m.name}({source.name})' for m in models]) name = f'{type(self).__name__}({expr})' CompositeModel.__init__(self, name, (source,)) def _get_noticed_energy_list(self): grid = [] for id in self.pha.response_ids: arf, rmf = self.pha.get_response(id) indep = None if arf is not None: indep = arf.get_indep() elif rmf is not None: indep = rmf.get_indep() grid.append(indep) self.elo, self.ehi, self.table = compile_energy_grid(grid) self.lo, self.hi = hc / self.ehi, hc / self.elo
[docs] def startup(self, cache=False): pha = self.pha if numpy.iterable(pha.mask): pha.notice_response(True) self.channel = pha.get_noticed_channels() self.mask = pha.get_mask() self._get_noticed_energy_list() CompositeModel.startup(self, cache)
[docs] def teardown(self): pha = self.pha if numpy.iterable(pha.mask): pha.notice_response(False) self.channel = pha.channel self.mask = numpy.ones(len(pha.channel), dtype=bool) self.elo = None self.ehi = None self.table = None self.lo = None self.hi = None CompositeModel.teardown(self)
def _check_for_user_grid(self, x, xhi=None): return (len(self.channel) != len(x) or not (sao_fcmp(self.channel, x, _tol) == 0).all()) def _startup_user_grid(self, x, xhi=None): # fit() never comes in here b/c it calls startup() pha = self.pha self.mask = numpy.zeros(len(pha.channel), dtype=bool) self.mask[numpy.searchsorted(pha.channel, x)] = True pha.notice_response(True, x) self._get_noticed_energy_list() def _teardown_user_grid(self): # fit() never comes in here b/c it calls startup() pha = self.pha self.mask = numpy.ones(len(pha.channel), dtype=bool) pha.notice_response(False) self.elo = None self.ehi = None self.table = None self.lo = None self.hi = None
[docs] def calc(self, p, x, xhi=None, *args, **kwargs): pha = self.pha # TODO: this should probably include AREASCAL user_grid = False try: if self._check_for_user_grid(x, xhi): user_grid = True self._startup_user_grid(x, xhi) # Slow if self.table is None: # again, fit() never comes in here b/c it calls startup() src = self.source vals = [] for model, args in zip(self.models, self.grid): elo, ehi = lo, hi = args if pha.units == 'wavelength': lo = hc / ehi hi = hc / elo vals.append(model(src(lo, hi))) self.orders = vals # Fast else: xlo, xhi = self.elo, self.ehi if pha.units == 'wavelength': xlo, xhi = self.lo, self.hi src = self.source(xlo, xhi) # hi-res grid of all ARF grids # Fold summed intervals through the associated response. self.orders = \ [model(sum_intervals(src, interval[0], interval[1])) for model, interval in zip(self.models, self.table)] vals = sum(self.orders) if self.mask is not None: vals = vals[self.mask] finally: if user_grid: self._teardown_user_grid() return vals
[docs] class MultipleResponse1D(Response1D): """A factory class for generating the instrument response. This should not be used when a pileup model is required. Parameters ---------- pha : sherpa.astro.data.DataPHA instance Support PHA files with multiple responses (e.g. orders) to describe the data. Raises ------ sherpa.utils.err.DataErr The argument does not contain any response information (it is missing an ARF and a RMF). See Also -------- Response1D """ def __call__(self, model, session=None): pha = self.pha if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) pha.notice_response(False) model = MultiResponseSumModel(model, pha) # TODO: should this include AREASCAL? if pha.exposure: model = pha.exposure * model return model
[docs] class PileupRMFModel(CompositeModel, ArithmeticModel): def __init__(self, rmf, model, pha=None): self.pha = pha self.channel = sao_arange(1, rmf.detchans) # sao_arange is inclusive self.mask = numpy.ones(rmf.detchans, dtype=bool) self.rmf = rmf self.elo, self.ehi = rmf.get_indep() self.lo, self.hi = hc / self.ehi, hc / self.elo self.model = model self.otherargs = None self.otherkwargs = None self._pars = () CompositeModel.__init__(self, f'apply_rmf({self.model.name})', (self.model,))
[docs] def startup(self, cache=False): pha = self.pha pha.notice_response(False) self.channel = pha.get_noticed_channels() self.mask = pha.get_mask() self.model.startup(cache) CompositeModel.startup(self, cache)
[docs] def teardown(self): # Note: # # The pha variable was declared but not used, so has been commented # out. It has been kept as a comment for future review, since it # is unclear whether anything should be done to the PHA object # during teardown # # pha = self.pha rmf = self.rmf self.channel = sao_arange(1, rmf.detchans) self.mask = numpy.ones(rmf.detchans, dtype=bool) self.model.teardown() CompositeModel.teardown(self)
def _check_for_user_grid(self, x): return (len(self.channel) != len(x) or not (sao_fcmp(self.channel, x, _tol) == 0).all()) def _startup_user_grid(self, x): # fit() never comes in here b/c it calls startup() self.mask = numpy.zeros(self.rmf.detchans, dtype=bool) self.mask[numpy.searchsorted(self.pha.channel, x)] = True def _calc(self, p, xlo, xhi): # Evaluate source model on RMF energy/wave grid OR # model.calc --> pileup_model src = self.model.calc(p, xlo, xhi) # rmf_fold return self.rmf.apply_rmf(src)
[docs] def calc(self, p, x, xhi=None, **kwargs): pha = self.pha # x is noticed/full channels here user_grid = False try: if self._check_for_user_grid(x): user_grid = True self._startup_user_grid(x) xlo, xhi = self.elo, self.ehi if pha is not None and pha.units == 'wavelength': xlo, xhi = self.lo, self.hi vals = self._calc(p, xlo, xhi) if self.mask is not None: vals = vals[self.mask] finally: if user_grid: self.mask = numpy.ones(self.rmf.detchans, dtype=bool) return vals
[docs] class PileupResponse1D(NoNewAttributesAfterInit): """A factory class for generating a response including pileup. Parameters ---------- pha : sherpa.astro.data.DataPHA instance The data object which defines the channel grid and instrument response. There must be both an ARF or RMF associated with the dataset when it is called. pileup_model : sherpa.astro.models.JDPileup instance The pileup model. Raises ------ sherpa.utils.err.DataErr The argument does not contain any response information (it is missing an ARF or RMF). See Also -------- Response1D """ def __init__(self, pha, pileup_model): self.pha = pha self.pileup_model = pileup_model NoNewAttributesAfterInit.__init__(self) def __call__(self, model, session=None): pha = self.pha # clear out any previous response filter pha.notice_response(False) if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) arf, rmf = pha.get_response() err_msg = None if arf is None and rmf is None: raise DataErr('norsp', pha.name) if arf is None: err_msg = 'does not have an associated ARF' elif pha.exposure is None: err_msg = 'does not specify an exposure time' if err_msg: raise InstrumentErr('baddata', pha.name, err_msg) # Currently, the response is NOT noticed using pileup # ARF convolution done inside ISIS pileup module # on finite grid scale model = model.apply(self.pileup_model, pha.exposure, arf.energ_lo, arf.energ_hi, arf.specresp, model) if rmf is not None: model = PileupRMFModel(rmf, model, pha) return model
[docs] class PSFModel(_PSFModel):
[docs] def fold(self, data): super().fold(data) # Set WCS coordinates of kernel data set to match source data set. if hasattr(self.kernel, "set_coord"): self.kernel.set_coord(data.coord)
[docs] def get_kernel(self, data, subkernel=True): indep, dep, kshape, lo, hi = self._get_kernel_data(data, subkernel) # ndim should be the same as self.ndim ndim = len(kshape) if ndim == 1: return Data1D('kernel', indep[0], dep) # Use kernel data set WCS if available eqpos = getattr(self.kernel, 'eqpos', None) sky = getattr(self.kernel, 'sky', None) # If kernel is a model, use WCS from data if available if callable(self.kernel): eqpos = getattr(data, 'eqpos', None) sky = getattr(data, 'sky', None) if ndim == 2: # Edit WCS to reflect the subkernel extraction in # physical coordinates. if (subkernel and sky is not None and lo is not None and hi is not None): if (WCS is not None): sky = WCS(sky.name, sky.type, sky.crval, sky.crpix - lo, sky.cdelt, sky.crota, sky.epoch, sky.equinox) # FIXME: Support for WCS only (non-Chandra) coordinate # transformations? return DataIMG('kernel', indep[0], indep[1], dep, kshape[::-1], sky=sky, eqpos=eqpos) # It's hard to trigger this case so we have no test coverage. raise PSFErr('ndim')
[docs] def create_arf(elo, ehi, specresp=None, exposure=None, ethresh=None, name='user-arf', header=None): """Create an ARF. .. versionadded:: 4.10.1 Parameters ---------- elo, ehi : numpy.ndarray The energy bins (low and high, in keV) for the ARF. It is assumed that ehi_i > elo_i, elo_j > 0, the energy bins are either ascending - so elo_i+1 > elo_i - or descending (elo_i+1 < elo_i), and that there are no overlaps. specresp : None or array, optional The spectral response (in cm^2) for the ARF. It is assumed to be >= 0. If not given a flat response of 1.0 is used. exposure : number or None, optional If not None, the exposure of the ARF in seconds. ethresh : number or None, optional Passed through to the DataARF call. It controls whether zero-energy bins are replaced. name : str, optional The name of the ARF data set header : dict Header for the created ARF Returns ------- arf : sherpa.astro.data.DataARF instance See Also -------- create_delta_rmf, create_non_delta_rmf """ if specresp is None: specresp = numpy.ones(elo.size, dtype=numpy.float32) return DataARF(name, energ_lo=elo, energ_hi=ehi, specresp=specresp, exposure=exposure, ethresh=ethresh, header=header)
# Notes for create*rmf: # - I do not think I have the startchan=0 case correct. Does the # f_chan array have to change? #
[docs] def create_delta_rmf(rmflo, rmfhi, offset=1, e_min=None, e_max=None, ethresh=None, name='delta-rmf', header=None): """Create an ideal RMF. The RMF has a unique mapping from channel to energy, in that each channel maps exactly to one energy bin, the mapping is monotonic, and there are no gaps. .. versionchanged:: 4.16.0 The e_min and e_max values will use the rmflo and rmfhi values if not set. .. versionadded:: 4.10.1 Parameters ---------- rmflo, rmfhi : array The energy bins (low and high, in keV) for the RMF. It is assumed that emfhi_i > rmflo_i, rmflo_j > 0, that the energy bins are either ascending, so rmflo_i+1 > rmflo_i or descending (rmflo_i+1 < rmflo_i), and that there are no overlaps. These correspond to the Elow and Ehigh columns (represented by the ENERG_LO and ENERG_HI columns of the MATRIX block) of the OGIP standard. offset : int, optional The starting channel number: expected to be 0 or 1 but this is not enforced. e_min, e_max : None or array, optional The E_MIN and E_MAX columns of the EBOUNDS block of the RMF. This must have the same size as rmflo and rmfhi as the RMF matrix is square in this "ideal" case. If not set they are taken from rmflo and rmfhi respectively. ethresh : number or None, optional Passed through to the DataRMF call. It controls whether zero-energy bins are replaced. name : str, optional The name of the RMF data set header : dict Header for the created RMF Returns ------- rmf : DataRMF instance See Also -------- create_arf, create_non_delta_rmf """ # Set up the delta-function response. # TODO: should f_chan start at startchan? # nchans = rmflo.size matrix = numpy.ones(nchans, dtype=numpy.float32) dummy = numpy.ones(nchans, dtype=numpy.int16) f_chan = numpy.arange(1, nchans + 1, dtype=numpy.int16) if e_min is None: e_min = rmflo if e_max is None: e_max = rmfhi return DataRMF(name, detchans=nchans, energ_lo=rmflo, energ_hi=rmfhi, n_grp=dummy, n_chan=dummy, f_chan=f_chan, matrix=matrix, offset=offset, e_min=e_min, e_max=e_max, ethresh=ethresh, header=header)
[docs] def create_non_delta_rmf(rmflo, rmfhi, fname, offset=1, e_min=None, e_max=None, ethresh=None, name='delta-rmf', header=None): """Create a RMF using a matrix from a file. The RMF matrix (the mapping from channel to energy bin) is read in from a file. .. versionchanged:: 4.16.0 The number of channels is now taken from e_min (if set) so the matrix is no-longer required to be square. .. versionadded:: 4.10.1 Parameters ---------- rmflo, rmfhi : array The energy bins (low and high, in keV) for the RMF. It is assumed that emfhi_i > rmflo_i, rmflo_j > 0, that the energy bins are either ascending, so rmflo_i+1 > rmflo_i or descending (rmflo_i+1 < rmflo_i), and that there are no overlaps. These correspond to the Elow and Ehigh columns (represented by the ENERG_LO and ENERG_HI columns of the MATRIX block) of the OGIP standard. fname : str The name of the two-dimensional image file which stores the response information (the format of this file matches that created by the `CIAO tool rmfimg <https://cxc.harvard.edu/ciao/ahelp/rmfimg.html>`_). offset : int, optional The starting channel number: expected to be 0 or 1 but this is not enforced. e_min, e_max : None or array, optional The E_MIN and E_MAX columns of the EBOUNDS block of the RMF. If not given the matrix is assumed to be square (using the rmflo and rmfhi values), otherwise these arrays provide the approximate mapping from channel to energy range of the RMF. ethresh : number or None, optional Passed through to the DataRMF call. It controls whether zero-energy bins are replaced. name : str The name of the RMF data set header : dict Header for the created RMF Returns ------- rmf : DataRMF instance See Also -------- create_arf, create_delta_rmf """ if fname is not None and not os.path.isfile(fname): raise ValueError(f"{fname} is not a file") # Set up the delta-function response. # TODO: should f_chan start at startchan? # # Is this a square matrix or not? if e_min is None: nchans = rmflo.size else: nchans = e_min.size n_grp, f_chan, n_chan, matrix = calc_grp_chan_matrix(fname) return DataRMF(name, detchans=nchans, energ_lo=rmflo, energ_hi=rmfhi, n_grp=n_grp, n_chan=n_chan, f_chan=f_chan, matrix=matrix, offset=offset, e_min=e_min, e_max=e_max, ethresh=ethresh, header=header)
def calc_grp_chan_matrix(fname: str) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]: """Read in an image and convert it to RMF components. For an image containing a RMF, such as created by the `CIAO tool rmfimg <https://cxc.harvard.edu/ciao/ahelp/rmfimg.html>`_), extract the needed data to create a DataRMF object (modulo knowledge of the channel or energy grids). Parameters ---------- fname : str The file name containing the RMF as an image in a format the I/O backend can read (normally this will be FITS). The X axis represents channels and the Y axis the energy resolution of the RMF. At present any WCS information stored about these axes is ignored, as is any metadata (such as the starting point of the channel axis). Returns ------- n_grp, f_chan, n_chan, matrix : (ndarray, ndarray, ndarray, ndarray) Needed to create a DataRMF to match fname. """ if TYPE_CHECKING: # Assume we have an I/O backend assert io.backend is not None # TODO: this could use the WCS info to create the channel and # energy arrays, at least for files created by rmfimg. However # it's not clear we can encode this information without losing # some information. # data, _ = io.backend.get_image_data(fname) return matrix_to_rmf(data["y"]) def matrix_to_rmf(matrix: numpy.ndarray) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]: """Convert a matrix (2D image) to RMF components. .. versionadded:: 4.16.0 Parameters ---------- matrix : ndarray A 2D matrix of shape (ny, nx), where ny represents the energy axis and nx the channels. Returns ------- n_grp, f_chan, n_chan, matrix : (ndarray, ndarray, ndarray, ndarray) Needed to create a DataRMF to match matrix. Notes ----- This assumes that the channel array starts at 1 and there is no knowledge of the energy bounds (either the ENERG_LO and ENERG_HI values used for the matrix itself or the E_MIN and E_MAX values used to approximate the channel boundaries). """ if matrix.ndim != 2: raise ValueError(f"matrix must be 2D, not {matrix.ndim}D") n_grp1: list[int] = [] n_chan1: list[int] = [] f_chan1: list[int] = [] for row in matrix > 0: flag = numpy.hstack([[0], row, [0]]) diffs = numpy.diff(flag, n=1) starts, = numpy.where(diffs > 0) ends, = numpy.where(diffs < 0) n_chan1.extend(ends - starts) f_chan1.extend(starts + 1) n_grp1.append(len(starts)) n_grp = numpy.asarray(n_grp1, dtype=numpy.int16) f_chan = numpy.asarray(f_chan1, dtype=numpy.int16) n_chan = numpy.asarray(n_chan1, dtype=numpy.int16) matrix = matrix.flatten() matrix = matrix[matrix > 0] return n_grp, f_chan, n_chan, matrix
[docs] @dataclass class RMFMatrix: """Raw RMF data""" matrix: numpy.ndarray """The matrix as a 2D array (X axis is channels, Y axis is energy)""" channels: EvaluationSpace1D """The channel values. This must be a non-integrated axis.""" energies: EvaluationSpace1D """The energy values. This must be an integrated axis.""" def __post_init__(self) -> None: if self.matrix.ndim != 2: raise ValueError("matrix must be 2D") if self.channels.is_integrated: raise ValueError("channels axis must not be integrated") if not self.energies.is_integrated: raise ValueError("energies axis must be integrated") nenergy, nchan = self.matrix.shape if self.channels.x_axis.size != nchan: raise ValueError("channels and matrix mismatch") if self.energies.x_axis.size != nenergy: raise ValueError("channels and matrix mismatch")
[docs] def rmf_to_matrix(rmf: Union[DataRMF, RMF1D]) -> RMFMatrix: """Convert a RMF to a matrix (2D image). .. versionadded:: 4.16.0 Parameters ---------- rmf : DataRMF or RMF1D The RMF instance. Returns ------- info : RMFMatrix The matrix as a 2D array (X axis is channels and Y axis is energy, and the channel and energy axes. """ if not isinstance(rmf, (DataRMF, RMF1D)): raise ValueError("not a rmf") # Create an image of size # nx = number of channels # ny = number of energy bine # nchans = rmf.detchans nenergy = rmf.energ_lo.size matrix = numpy.zeros((nenergy, nchans), dtype=rmf.matrix.dtype) # Loop through each energy bin and add in the data, which is split # into n_grp chunks, each starting at f_chan (with 1 being the # first element of this row). The RMF has removed excess data - # that is, rows with 0 groups and flattening out a 2D array for # the n_chan/f_chan values - which makes the reconstruction a # little messy. # matrix_start = 0 chan_idx = 0 for energy_idx, n_grp in enumerate(rmf.n_grp): # Loop through the groups for this energy for _ in range(n_grp): # Need to convert from 1-based (f_chan) numbering # (although this actually depends on the offset value) and # to convert from numpy.uint64 (since <int> + <uint64> # tends to get converted to a float). # start = rmf.f_chan[chan_idx].astype(int) - int(rmf.offset) nchan = rmf.n_chan[chan_idx].astype(int) end = start + nchan matrix_end = matrix_start + nchan matrix[energy_idx, start:end] = rmf.matrix[matrix_start:matrix_end] matrix_start = matrix_end chan_idx += 1 # TODO: Is the offset handling correct here? channels = numpy.arange(rmf.offset, rmf.offset + nchans, dtype=numpy.int16) cgrid = EvaluationSpace1D(channels) egrid = EvaluationSpace1D(rmf.energ_lo, rmf.energ_hi) return RMFMatrix(matrix, cgrid, egrid)
[docs] def rmf_to_image(rmf: Union[DataRMF, RMF1D]) -> DataIMG: """Convert a RMF to DataIMG. .. versionadded:: 4.16.0 Parameters ---------- rmf : DataRMF or RMF1D The RMF instance. Returns ------- image : DataIMG """ mat = rmf_to_matrix(rmf) nx = mat.channels.x_axis.size ny = mat.energies.x_axis.size x1, x0 = numpy.mgrid[1:ny + 1, 1:nx + 1] x0 = x0.flatten() x1 = x1.flatten() y = mat.matrix.flatten() out = DataIMG(rmf.name, x0, x1, y, shape=(ny, nx)) # Add some header keywords from the RMF # def copy(key): try: val = rmf.header[key] except KeyError: return if val is None: return out.header[key] = val copy('MISSION') copy('TELESCOP') copy('INSTRUME') copy('GRATING') copy('FILTER') copy('CHANTYPE') copy('ORDER') copy('OBJECT') copy('TITLE') out.header['DETCHANS'] = rmf.detchans if rmf.ethresh is not None: out.header['LO_THRES'] = rmf.ethresh out.header['NUMGRP'] = len(rmf.n_chan) out.header['NUMELT'] = len(rmf.matrix) return out
def has_pha_response(model: Model) -> bool: """Does the model contain a PHA response? Parameters ---------- model : Model instance The model expression to check. Returns ------- flag : bool True if there is a PHA response included anywhere in the expression. Examples -------- >>> rsp = Response1D(pha) >>> m1 = Gauss1D() >>> m2 = PowLaw1D() >>> has_pha_response(m1) False >>> has_pha_response(rsp(m1)) True >>> has_pha_response(m1 + m2) False >>> has_pha_response(rsp(m1 + m2)) True >>> has_pha_response(m1 + rsp(m2)) True """ # The following check should probably include ResponseNestedModel # but it's not obvious if this is currently used. # def wanted(c): return isinstance(c, (RSPModel, ARFModel, RMFModel)) if wanted(model): return True # This check relies on a composite class like the RSPModel is # included in the __iter__ method (see CompositeModel._get_parts) # otherwise the following would have had to be a recursive call # to has_pha_instance. # for cpt in model: if wanted(cpt): return True return False