Source code for sherpa.instrument

#
#  Copyright (C) 2008, 2016, 2018, 2019, 2020, 2021, 2022
#  Smithsonian Astrophysical Observatory
#
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

import logging
import warnings

import numpy

from sherpa.data import Data, Data1D, Data2D
from sherpa.models import ArithmeticModel, ArithmeticConstantModel, \
    ArithmeticFunctionModel, CompositeModel, Model
from sherpa.models.parameter import Parameter
from sherpa.models.regrid import EvaluationSpace1D, EvaluationSpace2D, rebin_2d
from sherpa.utils import bool_cast, NoNewAttributesAfterInit
from sherpa.utils.err import PSFErr
from sherpa.utils._psf import extract_kernel, get_padsize, normalize, \
    pad_data, set_origin, tcdData, unpad_data

import sherpa
info = logging.getLogger(__name__).info

string_types = (str, )


__all__ = ('Kernel', 'PSFKernel', 'RadialProfileKernel', 'PSFModel',
           'ConvolutionModel', 'PSFSpace2D')


def make_renorm_shape(shape):
    """Given a shape, calculate the appropriate renorm_shape."""

    out = []
    for axis in shape:
        out.append(get_padsize(2 * axis))

    return out


[docs]class ConvolutionModel(CompositeModel, ArithmeticModel):
[docs] @staticmethod def wrapobj(obj): if isinstance(obj, ArithmeticModel): return obj return ArithmeticFunctionModel(obj)
[docs] @staticmethod def wrapkern(obj): if isinstance(obj, ArithmeticModel): return obj if callable(obj): return ArithmeticFunctionModel(obj) return ArithmeticConstantModel(obj, 'kernel')
def __init__(self, lhs, rhs, psf): self.lhs = self.wrapkern(lhs) self.rhs = self.wrapobj(rhs) self.psf = psf CompositeModel.__init__(self, f"{self.psf.name}({self.rhs.name})", (self.psf, self.lhs, self.rhs))
[docs] def calc(self, p, *args, **kwargs): nlhs = len(self.lhs.pars) return self.psf.calc(p[:nlhs], p[nlhs:], self.lhs.calc, self.rhs.calc, *args, **kwargs)
[docs]class Kernel(NoNewAttributesAfterInit): """Base class for convolution kernels There are some validation checks made when the object is created but not when fields are changed. The assumption is that concepts like the dimensionality of the kernel are not going to be changed. """ def __init__(self, dshape, kshape, norm=False, frozen=True, center=None, args=[], kwargs={}, do_pad=False, pad_mask=None, origin=None): # As these are low-level routines use Python exceptions # rather than the Sherpa-specific ones. # try: nd = len(dshape) except TypeError: raise TypeError("dshape must be a sequence") try: nk = len(kshape) except TypeError: raise TypeError("kshape must be a sequence") if nd != nk: raise ValueError(f"dshape and kshape must be the same size, not {nd} and {nk}") if nd == 0: raise ValueError("0D kernel is not supported") # There is a specific PSFErr here so use it. if nd > 2: raise PSFErr("ndim") self.ndim = nd if origin is None: origin = numpy.zeros(self.ndim) self.dshape = dshape self.kshape = kshape self.kernel = None self.skshape = None self.norm = norm self.origin = origin self.frozen = frozen self.center = center self.args = args self.kwargs = kwargs self.renorm_shape = None self.renorm = None self.do_pad = do_pad self.pad_mask = pad_mask self.frac = None self._tcd = tcdData() super().__init__() def __setstate__(self, state): state['_tcd'] = tcdData() self.__dict__.update(state) def __getstate__(self): state = self.__dict__.copy() state.pop('_tcd') return state def __repr__(self): return f"<{type(self).__name__} kernel instance>" def __str__(self): ss = [ f"dshape = {self.dshape}", f"kshape = {self.kshape}", # f"kernel = {type(self.kernel).__name__}", f"skshape = {self.skshape}", f"norm = {self.norm}", f"origin = {self.origin}", f"frozen = {self.frozen}", f"center = {self.center}", f"args = {self.args}", f"kwargs = {self.kwargs}", f"renorm_shape = {self.renorm_shape}", f"renorm = {self.renorm}", f"do_pad = {self.do_pad}", f"pad_mask = {self.pad_mask}", f"frac = {self.frac}" ] return "\n".join(ss) # The kernel is a 1D array and does not know it's original # dimensions. #
[docs] def init_kernel(self, kernel): if not self.frozen: self._tcd.clear_kernel_fft() renorm_shape = make_renorm_shape(self.dshape) self.renorm_shape = tuple(renorm_shape) kernpad = pad_data(kernel, self.dshape, self.renorm_shape) renorm = self._tcd.convolve(numpy.ones(len(kernel)), kernpad, self.dshape, renorm_shape, self.origin) self.renorm = unpad_data(renorm, renorm_shape, self.dshape) return (kernel, self.dshape)
[docs] def init_data(self, data): if self.renorm_shape is None: renorm_shape = make_renorm_shape(self.dshape) self.renorm_shape = tuple(renorm_shape) # pad the data and convolve with unpadded kernel datapad = pad_data(data, self.dshape, self.renorm_shape) return (datapad, self.renorm_shape)
[docs] def deinit(self, vals): if self.renorm is not None: vals = unpad_data(vals, self.renorm_shape, self.dshape) vals = vals / self.renorm if self.do_pad: vals = vals[self.pad_mask] return vals
[docs] def convolve(self, data, dshape, kernel, kshape): return self._tcd.convolve(data, kernel, dshape, kshape, self.origin)
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): if self.do_pad and len(args[0]) == numpy.prod(self.dshape): self.do_pad = False data = rhs(pr, *self.args, **self.kwargs) (data, dshape) = self.init_data(data) if self.kernel is None or not self.frozen: kernel = lhs(pl, *self.args, **self.kwargs) (self.kernel, self.skshape) = self.init_kernel(kernel) vals = self.convolve(data, dshape, self.kernel, self.skshape) return self.deinit(vals)
[docs]class ConvolutionKernel(Model): def __init__(self, kernel, name='conv'): self.kernel = kernel self.name = name self._tcd = tcdData() super().__init__(name) def __setstate__(self, state): state['_tcd'] = tcdData() self.__dict__.update(state) def __getstate__(self): state = self.__dict__.copy() state.pop('_tcd') return state def __repr__(self): return f"<{type(self).__name__} kernel instance>" def __str__(self): if self.kernel is None: raise PSFErr('notset') return f"Convolution Kernel:\n{self.kernel}" def __call__(self, model, session=None): if self.kernel is None: raise PSFErr('notset') kernel = self.kernel if isinstance(kernel, Data): kernel = numpy.asarray(kernel.get_dep()) if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) return ConvolutionModel(kernel, model, self)
[docs] def set_kernel(self, kernel): self.kernel = kernel
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): self._tcd.clear_kernel_fft() data = numpy.asarray(rhs(pr, *args, **kwargs)) kern = numpy.asarray(lhs(pl, *args, **kwargs)) size = data.size return self._tcd.convolve(data, kern, size, kern.size, int(size / 2))[:size]
[docs]class PSFKernel(Kernel): "class for PSF convolution kernels" def __init__(self, dshape, kshape, is_model=False, norm=True, frozen=True, center=None, size=None, lo=None, hi=None, width=None, args=[], kwargs={}, pad_mask=None, do_pad=False, origin=None): self.is_model = is_model self.size = size self.lo = lo self.hi = hi self.width = width self.radial = 0 super().__init__(dshape, kshape, norm, frozen, center, args, kwargs, do_pad, pad_mask, origin) # The super-class handles a missing origin in a different # manner to this class. # if origin is None: self.origin = origin def __str__(self): ss = [ f"is_model = {self.is_model}", f"size = {self.size}", f"lo = {self.lo}", f"hi = {self.hi}", f"width = {self.width}", f"radial = {self.radial}" ] return Kernel.__str__(self) + "\n" + "\n".join(ss)
[docs] def init_kernel(self, kernel): # If PSF dataset, normalize before kernel extraction # if not self.is_model and self.norm: if self.norm: kernel = normalize(kernel) (kernel, kshape, self.frac, lo, hi) = extract_kernel(kernel, self.kshape, self.size, self.center, self.lo, self.hi, self.width, self.radial) # If PSF model, then normalize integrated volume to 1, after # kernel extraction # if self.is_model and self.norm: # self.frac = 1.0 # kernel = normalize(kernel) # Find brightest pixel of PSF--assume that is the origin # Just assuming that the origin is half of szs1 can lead to # unwanted pixel shifts--but this assumes that origin should # be centered on brightest pixel. brightPixel = list(numpy.where(kernel == kernel.max())).pop() # if more than one pixel qualifies as brightest, such as const2D # use the middle of subkernel -- assumes the user provided center at # time of kernel extraction, so that should be middle of subkernel. origin = None if (not numpy.isscalar(brightPixel)) and len(brightPixel) != 1: origin = set_origin(kshape) else: # brightPixel is a NumPy index (int64) which - as of NumPy 1.18 # and Python 3.8 - causes a TypeError with the message # "only integer scalar arrays can be converted to a scalar index" # to be thrown here if sent directly to set_origin. So # we convert to a Python integer type. # origin = set_origin(kshape, int(brightPixel)) if self.origin is None: self.origin = origin if self.is_model and not self.frozen: # if the kernel model has thawed parameters, clear the old FFT and # recompute the kernel FFT at each model evaluation self._tcd.clear_kernel_fft() return (kernel, kshape)
[docs] def init_data(self, data): return (data, self.dshape)
[docs]class RadialProfileKernel(PSFKernel): "class for 1D radial profile PSF convolution kernels" def __init__(self, dshape, kshape, is_model=False, norm=True, frozen=True, center=None, size=None, lo=None, hi=None, width=None, args=[], kwargs={}, pad_mask=None, do_pad=False, origin=None): self.radialsize = None super().__init__(dshape, kshape, is_model, norm, frozen, center, size, lo, hi, width, args, kwargs, pad_mask, do_pad, origin) self.radial = 1 # over-ride super-class if self.ndim != 1: raise PSFErr(f"Radial profile requires 1D data, not {self.ndim}D") def __str__(self): return (PSFKernel.__str__(self) + "\n" + f"radialsize = {self.radialsize}")
[docs] def init_data(self, data): data, dshape = super().init_data(data) # NOTICE: radial profile is 1D only! if self.radialsize is None: self.radialsize = self.dshape[0] return data, dshape
[docs] def deinit(self, vals): # NOTICE: radial profile is 1D only! vals = vals[:self.radialsize] return super().deinit(vals)
[docs] def convolve(self, data, dshape, kernel, kshape): origin = self.origin if self.radialsize is not None: origin = self.origin + (numpy.asarray(dshape) - numpy.asarray(self.radialsize)) return self._tcd.convolve(data, kernel, dshape, kshape, origin)
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): if self.do_pad and len(args[0]) == numpy.prod(self.dshape): self.do_pad = False data = rhs(pr, *self.args, **self.kwargs) (data, dshape) = self.init_data(data) # NOTICE: radial profile is 1D only! # old sherpa source model grid extension to zero # (radial profile add core) tail_grid = _create_tail_grid(self.args) if tail_grid is not None: tail = rhs(pr, *tail_grid, **self.kwargs) data = numpy.concatenate([tail, data]) dshape = (len(data),) if self.kernel is None or not self.frozen: kernel = lhs(pl, *self.args, **self.kwargs) (self.kernel, self.skshape) = self.init_kernel(kernel) vals = self.convolve(data, dshape, self.kernel, self.skshape) return self.deinit(vals)
def _create_tail_grid(axis_list): if len(axis_list) == 1: # non-binned axis grid = axis_list[0] # origsize = len(grid) width = grid[1] - grid[0] tail = numpy.arange(grid[0] - width, 0., -width)[::-1] return (tail,) if len(axis_list) == 2: # binned axis gridlo, gridhi = axis_list # origsize = len(gridlo) width = (gridhi[0] - gridlo[0]) mid = (gridlo[0] + gridhi[0]) / 2. mids = numpy.arange(mid, 0., -width)[::-1] taillo = mids - width / 2. tailhi = mids + width / 2. return (taillo, tailhi) return None
[docs]class PSFModel(Model): """Convolve a model by another model or data set. At the moment the code does not distinguish between 1D and 2D data and models. Parameters ---------- name : str The name for the model. kernel : sherpa.data.Data instance, callable, or None, optional The kernel used to convolve models. This can be changed. Notes ----- A number of attributes are displayed as parameters, if set, but are not handled as parameters. The attributes are: kernel, size, origin, and center. The size, center, and origin values are only displayed when set (and will be set by the `fold` method if needed). There is an attempt to ensure that the size, origin, and center fields have the correct size - that is they match the dimensionality of the kernel - but it is possible for an invalid combination to be set. """ def __init__(self, name='psfmodel', kernel=None): # store the name without the leading "psfmodel." term that Model adds. self._name = name self._size = None self._origin = None self._center = None self._must_rebin = False self._model = None self._kernel = None self.radial = Parameter(name, 'radial', 0, 0, 1, hard_min=0, hard_max=1, alwaysfrozen=True) self.norm = Parameter(name, 'norm', 1, 0, 1, hard_min=0, hard_max=1, alwaysfrozen=True) self.kernel = kernel self.data_space = None self.psf_space = None super().__init__(name) def _get_kernel(self): return self._kernel def _set_kernel(self, kernel): # Always clear the model self._model = None odim = self.ndim if odim is None: # avid having to check for None in code below odim = 0 def clear_fields(): "Clear the array-like fields" if self.ndim is not None and self.ndim == odim: return self.size = None self.origin = None self.center = None if kernel is None: self._kernel = None self.ndim = None clear_fields() return if isinstance(kernel, Data): self._kernel = kernel self.ndim = kernel.ndim clear_fields() return if not callable(kernel): raise PSFErr('nopsf', self._name) # We could only allow a sherpa.models.model.Model instance here, # but allowable any callable, but that means the dimensionality # may not be set. # self._kernel = kernel try: self.ndim = getattr(kernel, "ndim") except AttributeError: # It's hard to trigger this case so we have no test coverage. self.ndim = None clear_fields() kernel = property(_get_kernel, _set_kernel, doc="""The kernel (sherpa.data.Data or sherpa.models.model.Model instance, callable, or None). The kernel determines the dimensionality of the model (the `ndim` attribute), although it can remain as `None` for callable arguments. The size, origin, and center fields, if set, must match the `ndim` field, and will be cleared if they do not match. """) def _get_field(self, name): """Return the field value""" return getattr(self, name) def _set_field(self, name, vals): """Set the field value The value is checked for the correct size when possible. However we have to support self.ndim being None because a callable (or some specialised model) has been used, which makes it hard to ensure that everything matches. Setting the kernel will clear these fields if ndim changes (or is None) but it is still possible to get into cases where the fields do not match. These fields are only really meaningfull after the fold method has been called. Parameters ---------- name : str The name of the field (the attribute name). vals The value to set the field. It can not be a string, but does not need to be a sequence (so a scalar can be used). Notes ----- The stored value is converted to a tuple if is not a tuple, list, or ndarray. This is an attempt to make sure that the field can be accessed as if it is a sequence (it is unclear what the original design intended for these fields, but existing code seems to require a tuple-like interface). """ if vals is None: setattr(self, name, None) return if type(vals) in (str, numpy.string_): raise PSFErr('nostr') if type(vals) not in (list, tuple, numpy.ndarray): vals = [vals] nvals = len(vals) if self.ndim is not None and nvals != self.ndim: # remove leading underscore from the name when reporting an error raise PSFErr("mismatch_dims", self.name, name[1:], self.ndim, nvals) if self.ndim == 1: vals = vals[0] else: vals = tuple(vals) setattr(self, name, vals) def _get_center(self): return self._get_field("_center") def _set_center(self, vals): self._set_field("_center", vals) center = property(_get_center, _set_center, doc='array of size parameters') def _get_size(self): return self._get_field("_size") def _set_size(self, vals): self._set_field("_size", vals) size = property(_get_size, _set_size, doc='array of size parameters') def _get_origin(self): return self._get_field("_origin") def _set_origin(self, vals): self._set_field("_origin", vals) origin = property(_get_origin, _set_origin, doc='FFT origin') @property def model(self): """The model that applies the convolution. This is set by the `fold` method and can not be changed directly. """ return self._model def _set_model(self, model): """Set the model, after checking the dimensions This is not made into the model.setter property as the idea is that external users do not change this setting. """ # This is tricky to trigger (in fact, the only existing uses # of this code does not set the model to None), so we do not # add a test. # if model is None: self._model = None return # Is this worthwhile, e.g.: # It's hard to trigger this case so we have no test coverage. if self.ndim is not None and self.ndim != model.ndim: raise PSFErr(f"Dimension of model do not match the kernel: {model.ndim}D and {self.ndim}D") self._model = model def _get_array_str(self, name, value): """Display the 'array-like' fields""" name = f"{self._name}.{name}" flag = "frozen" # We could be a bit-more clever about this conversion but does # not seem worth it. value = str(value) # Do we need to return the min/max values? return f"\n {name:12s} {flag:6s} {value:>12s} {value:>12s} {value:>12s}" def _get_str(self): s = '' if self.kernel is not None: s += ('\n %-12s %-6s %12s' % ('%s.kernel' % self._name, 'frozen', self.kernel.name)) if self.size is not None: s += self._get_array_str("size", self.size) if self.center is not None: s += self._get_array_str("center", self.center) if self.origin is not None: s += self._get_array_str("origin", self.origin) for p in [self.radial, self.norm]: s += ('\n %-12s %-6s %12g %12g %12g %10s' % (p.fullname, 'frozen', p.val, p.min, p.max, p.units)) return s def __str__(self): s = self.name hfmt = '\n %-12s %-6s %12s %12s %12s %10s' s += hfmt % ('Param', 'Type', 'Value', 'Min', 'Max', 'Units') s += hfmt % ('-' * 5, '-' * 4, '-' * 5, '-' * 3, '-' * 3, '-' * 5) s += self._get_str() return s def __call__(self, model, session=None): if self.kernel is None: raise PSFErr('notset') kernel = self.kernel if isinstance(kernel, Data): kernel = numpy.asarray(kernel.get_dep()) if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) return ConvolutionModel(kernel, model, self)
[docs] def calc(self, *args, **kwargs): if self.model is None: raise PSFErr('nofold') psf_space_evaluation = self.model.calc(*args, **kwargs) if self._must_rebin: return rebin_2d(psf_space_evaluation, self.psf_space, self.data_space).ravel() return psf_space_evaluation
[docs] def fold(self, data): """The data to be convolved by the PSF. Parameters ---------- data : sherpa.data.Data or sherpa.models.model.Model instance or a callable It must match the dimensionality of the kernel. Raises ------ sherpa.utils.err.PSFErr The kernel has not been set. """ if self.kernel is None: raise PSFErr('nopsf', self._name) # TODO: Should we treat origin as we do center and size? kwargs = {"norm": bool_cast(self.norm.val), "origin": self.origin } # This validates the dimensionality of data. We can probably # remove the ndim checks below because of this, but I am not # convinced yet, so leave them in. This means that several # error conditions do not have test coverage. # (args, dshape) = self._create_spaces(data) kwargs['args'] = args if isinstance(self.kernel, Data): kwargs['is_model'] = False kshape = self.kernel.get_dims() nkernel = self.ndim if nkernel != data.ndim: raise PSFErr("mismatch_dims", self.kernel.name, data.name, nkernel, data.ndim) if self.center is None: self.center = [int(dim / 2.) for dim in kshape] if self.size is None: self.size = kshape else: kwargs['is_model'] = True kshape = data.get_dims() nkernel = len(kshape) # To support using any callable, not just a model, we need # to allow the kernel dimensions to be unknown. The # alternative is to require the kernel to have a ndim # attribute, but there are a number of places the code # allows the kernel to not be a model instance (e.g. the # checks for pars/thawedpars). # if self.ndim is not None and nkernel != self.ndim: raise PSFErr("mismatch_dims", self.kernel.name, data.name, self.ndim, nkernel) if self.center is None: self.center = [int(dim / 2.) for dim in dshape] if self.size is None: self.size = dshape if hasattr(self.kernel, 'pars'): # freeze all PSF model parameters if not already. for par in self.kernel.pars: par.freeze() # TODO: shouldn't thawedpars always be True with the above? # if hasattr(self.kernel, 'thawedpars'): kwargs['frozen'] = (len(self.kernel.thawedpars) == 0) kwargs['center'] = self.center kwargs['size'] = self.size # TODO: Why is this restricted to 1D? is_kernel = (kwargs['is_model'] and not kwargs['norm'] and nkernel == 1) # Handle noticed regions for convolution if numpy.iterable(data.mask): kwargs['do_pad'] = True kwargs['pad_mask'] = data.mask else: kwargs['do_pad'] = False if is_kernel: for kwarg in ['is_model', 'size']: kwargs.pop(kwarg) self._set_model(Kernel(dshape, kshape, **kwargs)) return # TODO: # If these are not set some tests seem to go into an infinite loop # eg calling a convolved model in # sherpa/models/tests/test_regrid_unit.py::test_regrid1d_works_with_convolution_style # Does this indicate that there should be better argument checking # or defaults? # kwargs['lo'] = numpy.ones(nkernel) kwargs['hi'] = kshape kwargs['width'] = numpy.ones(nkernel) # TODO: why is this not just checking 'self.radial.val > 0' # instead of 'int(self.radial.val)'? Aren't they the same, and # the former is clearer? # if int(self.radial.val): self._set_model(RadialProfileKernel(dshape, kshape, **kwargs)) return self._set_model(PSFKernel(dshape, kshape, **kwargs))
def _get_kernel_data(self, data, subkernel=True): if self.kernel is None: raise PSFErr('notset') self.fold(data) kernel = self.kernel if isinstance(kernel, Data): dep = numpy.asarray(kernel.get_dep()) indep = kernel.get_indep() else: dep = kernel(*self.model.args, **self.model.kwargs) indep = self.model.args kshape = self.model.kshape lo = None hi = None if subkernel: (dep, newshape) = self.model.init_kernel(dep) if (numpy.array(kshape) != numpy.array(newshape)).any(): newindep = [] for axis in indep: args = extract_kernel(axis, self.model.kshape, self.model.size, self.model.center, self.model.lo, self.model.hi, self.model.width, self.model.radial) newindep.append(args[0]) # TODO: shouldn't we store these values like we do # newindep? This looks to be a bug but we do not # have tests to check the expected behavior (it # requires DataIMG + WCS data). # lo = args[3] hi = args[4] indep = newindep kshape = newshape if self.model.frac is not None: info('PSF frac: %s' % self.model.frac) if numpy.isscalar(kshape): kshape = [kshape] return (indep, dep, kshape, lo, hi)
[docs] def get_kernel(self, data, subkernel=True): """Return a data object representing the kernel. Parameters ---------- data : sherpa.data.Data or sherpa.models.model.Model instance The data to apply the kernel to. This routine will pass `data` to the `fold` method. subkernel : bool, optional Returns ------- data : sherpa.data.Data1D or sherpa.data.Data2D instance """ kdata = self._get_kernel_data(data, subkernel) indep = kdata[0] dep = kdata[1] kshape = kdata[2] # ndim should be the same as self.ndim ndim = len(kshape) # TODO: what happens with integrated datasets? This currently # returns the low edge of each bin. Should it use the center of # the bin or use the full ranges? Is this even an issue? # if ndim == 1: return Data1D('kernel', indep[0], dep) if ndim == 2: # Note that the shape order is reversed. return Data2D('kernel', indep[0], indep[1], dep, kshape[::-1]) raise PSFErr('ndim')
def _create_spaces(self, data): """Setup the data space based on the pixel size.""" # This has been pulled out of fold so is currently lacking in documentation. # # To support using any callable, not just a model, we need # to allow the "kernel dimensionality" to be unknown. The # alternative is to require the kernel to nave a ndim # attribute, but there are a number of places the code # allows the kernel to not be a model instance (e.g. the # checks for pars/thawedpars). # if self.ndim is not None and hasattr(data, "ndim") and data.ndim != self.ndim: raise PSFErr("mismatch_dims", self.kernel.name, data.name, self.ndim, data.ndim) pixel_size_comparison = self._check_pixel_size(data) indep = data.get_indep() # Don't do anything special if pixel_size_comparison == self.SAME_RESOLUTION: if self.ndim == 1: self.data_space = EvaluationSpace1D(*indep) elif self.ndim == 2: self.data_space = EvaluationSpace2D(*indep) else: # leave in in case we support higher dimensions or one # of the rare dimensionless models is in use. raise PSFErr("ndim") self._must_rebin = False return (indep, data.get_dims()) # Evaluate model in PSF space. Note that if we get here then # we have to have 2D data. # if pixel_size_comparison == self.BETTER_RESOLUTION: self.data_space = EvaluationSpace2D(*indep) self.psf_space = PSFSpace2D(self.data_space, self, data.sky.cdelt) self._must_rebin = True return (self.psf_space.grid, self.psf_space.shape) # PSF has worse resolution, error out raise AttributeError("The PSF has a worse resolution than the data.") def _check_pixel_size(self, data): """ If the data and PSF dot not have WCS information, assume the PSF has the same resolution as the image. Otherwise check the WCS information to determine the relative resolutions. We only check the resolution in one dimention and assume they are the same. """ if not hasattr(self.kernel, "sky"): return self.SAME_RESOLUTION # This corresponds to the case when the kernel is actually a psf image, not just a model. try: psf_pixel_size = self.kernel.sky.cdelt except AttributeError: # If the kernel does not have a pixel size, issue a warning and keep going warnings.warn("PSF Image does not have a pixel size. Sherpa will assume " "the pixel size is the same as the data") return self.SAME_RESOLUTION try: data_pixel_size = data.sky.cdelt except AttributeError: warnings.warn("Data Image does not have a pixel size. Sherpa will assume " "the pixel size is the same as the PSF") return self.SAME_RESOLUTION if numpy.allclose(psf_pixel_size, data_pixel_size): return self.SAME_RESOLUTION if psf_pixel_size[0] < data_pixel_size[0]: return self.BETTER_RESOLUTION if psf_pixel_size[0] > data_pixel_size[0]: return self.WORSE_RESOLUTION return self.SAME_RESOLUTION SAME_RESOLUTION = 0 BETTER_RESOLUTION = 1 WORSE_RESOLUTION = -1
class PSFSpace2D(EvaluationSpace2D): """ This class defines a special evaluation space that has the same boundaries as the data space but a number of pixels consistent with the pixel size of the PSF. This space is used when the data image and the PSF have a different pixel size so the model is evaluated on the "PSF space" and then rebinned back to the data space for the calculation of the statistic during a fit. """ def __init__(self, data_space, psf_model, data_pixel_size=None): if data_pixel_size is None: data_pixel_size = [1, 1] x_start, y_start = data_space.start x_end, y_end = data_space.end psf_pixel_size_axis0 = psf_model.kernel.sky.cdelt[0] psf_pixel_size_axis1 = psf_model.kernel.sky.cdelt[1] data_pixel_size_axis0 = data_pixel_size[0] data_pixel_size_axis1 = data_pixel_size[1] step_x = psf_pixel_size_axis0 / data_pixel_size_axis0 step_y = psf_pixel_size_axis1 / data_pixel_size_axis1 x_range_end, y_range_end = x_end + 1, y_end + 1 x = numpy.arange(x_start, x_range_end, step_x) y = numpy.arange(y_start, y_range_end, step_y) self.data_2_psf_pixel_size_ratio = (step_x, step_y) super().__init__(x, y)