Source code for sherpa.instrument

#
#  Copyright (C) 2008, 2016, 2018, 2019, 2020, 2021
#  Smithsonian Astrophysical Observatory
#
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

import logging
import warnings

import numpy

from sherpa.data import Data, Data1D, Data2D
from sherpa.models import ArithmeticModel, ArithmeticConstantModel, \
    ArithmeticFunctionModel, CompositeModel, Model
from sherpa.models.parameter import Parameter
from sherpa.models.regrid import EvaluationSpace2D, rebin_2d
from sherpa.utils import bool_cast, NoNewAttributesAfterInit
from sherpa.utils.err import PSFErr
from sherpa.utils._psf import extract_kernel, get_padsize, normalize, \
    pad_data, set_origin, tcdData, unpad_data

import sherpa
info = logging.getLogger(__name__).info

string_types = (str, )


__all__ = ('Kernel', 'PSFKernel', 'RadialProfileKernel', 'PSFModel',
           'ConvolutionModel', 'PSFSpace2D')


[docs]class ConvolutionModel(CompositeModel, ArithmeticModel):
[docs] @staticmethod def wrapobj(obj): if isinstance(obj, ArithmeticModel): return obj return ArithmeticFunctionModel(obj)
[docs] @staticmethod def wrapkern(obj): if isinstance(obj, ArithmeticModel): return obj elif callable(obj): return ArithmeticFunctionModel(obj) return ArithmeticConstantModel(obj, 'kernel')
def __init__(self, lhs, rhs, psf): self.lhs = self.wrapkern(lhs) self.rhs = self.wrapobj(rhs) self.psf = psf CompositeModel.__init__(self, ('%s(%s)' % (self.psf.name, self.rhs.name)), (self.psf, self.lhs, self.rhs))
[docs] def calc(self, p, *args, **kwargs): nlhs = len(self.lhs.pars) return self.psf.calc(p[:nlhs], p[nlhs:], self.lhs.calc, self.rhs.calc, *args, **kwargs)
[docs]class Kernel(NoNewAttributesAfterInit): "Base class for convolution kernels" def __init__(self, dshape, kshape, norm=False, frozen=True, center=None, args=[], kwargs={}, do_pad=False, pad_mask=None, origin=None): if origin is None: origin = numpy.zeros(len(kshape)) self.dshape = dshape self.kshape = kshape self.kernel = None self.skshape = None self.norm = norm self.origin = origin self.frozen = frozen self.center = center self.args = args self.kwargs = kwargs self.renorm_shape = None self.renorm = None self.do_pad = do_pad self.pad_mask = pad_mask self.frac = None self._tcd = tcdData() NoNewAttributesAfterInit.__init__(self) def __setstate__(self, state): state['_tcd'] = tcdData() self.__dict__.update(state) def __getstate__(self): state = self.__dict__.copy() state.pop('_tcd') return state def __repr__(self): return "<%s kernel instance>" % type(self).__name__ def __str__(self): ss = [ 'dshape = %s' % str(self.dshape), 'kshape = %s' % str(self.kshape), # 'kernel = %s' % type(self.kernel).__name__, 'skshape = %s' % str(self.skshape), 'norm = %s' % str(self.norm), 'origin = %s' % str(self.origin), 'frozen = %s' % str(self.frozen), 'center = %s' % str(self.center), 'args = %s' % str(self.args), 'kwargs = %s' % str(self.kwargs), 'renorm_shape = %s' % str(self.renorm_shape), 'renorm = %s' % str(self.renorm), 'do_pad = %s' % str(self.do_pad), 'pad_mask = %s' % str(self.pad_mask), 'frac = %s' % str(self.frac) ] return '\n'.join(ss)
[docs] def init_kernel(self, kernel): if not self.frozen: self._tcd.clear_kernel_fft() renorm_shape = [] for axis in self.dshape: renorm_shape.append(get_padsize(2 * axis)) self.renorm_shape = tuple(renorm_shape) kernpad = pad_data(kernel, self.dshape, self.renorm_shape) self.renorm = self._tcd.convolve(numpy.ones(len(kernel)), kernpad, self.dshape, renorm_shape, self.origin) self.renorm = unpad_data(self.renorm, renorm_shape, self.dshape) return (kernel, self.dshape)
[docs] def init_data(self, data): if self.renorm_shape is None: renorm_shape = [] for axis in self.dshape: renorm_shape.append(get_padsize(2 * axis)) self.renorm_shape = tuple(renorm_shape) # pad the data and convolve with unpadded kernel datapad = pad_data(data, self.dshape, self.renorm_shape) return (datapad, self.renorm_shape)
[docs] def deinit(self, vals): if self.renorm is not None: vals = unpad_data(vals, self.renorm_shape, self.dshape) vals = vals / self.renorm if self.do_pad: vals = vals[self.pad_mask] return vals
[docs] def convolve(self, data, dshape, kernel, kshape): return self._tcd.convolve(data, kernel, dshape, kshape, self.origin)
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): if self.do_pad and len(args[0]) == numpy.prod(self.dshape): self.do_pad = False data = rhs(pr, *self.args, **self.kwargs) (data, dshape) = self.init_data(data) if self.kernel is None or not self.frozen: kernel = lhs(pl, *self.args, **self.kwargs) (self.kernel, self.skshape) = self.init_kernel(kernel) vals = self.convolve(data, dshape, self.kernel, self.skshape) return self.deinit(vals)
[docs]class ConvolutionKernel(Model): def __init__(self, kernel, name='conv'): self.kernel = kernel self.name = name self._tcd = tcdData() Model.__init__(self, name) def __setstate__(self, state): state['_tcd'] = tcdData() self.__dict__.update(state) def __getstate__(self): state = self.__dict__.copy() state.pop('_tcd') return state def __repr__(self): return "<%s kernel instance>" % type(self).__name__ def __str__(self): if self.kernel is None: raise PSFErr('notset') return "Convolution Kernel:\n" + self.kernel.__str__() def __call__(self, model, session=None): if self.kernel is None: raise PSFErr('notset') kernel = self.kernel if isinstance(kernel, Data): kernel = numpy.asarray(kernel.get_dep()) if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) return ConvolutionModel(kernel, model, self)
[docs] def set_kernel(self, kernel): self.kernel = kernel
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): self._tcd.clear_kernel_fft() data = numpy.asarray(rhs(pr, *args, **kwargs)) kern = numpy.asarray(lhs(pl, *args, **kwargs)) size = data.size return self._tcd.convolve(data, kern, size, kern.size, int(size / 2))[:size]
[docs]class PSFKernel(Kernel): "class for PSF convolution kernels" def __init__(self, dshape, kshape, is_model=False, norm=True, frozen=True, center=None, size=None, lo=None, hi=None, width=None, args=[], kwargs={}, pad_mask=None, do_pad=False, origin=None): self.is_model = is_model self.size = size self.lo = lo self.hi = hi self.width = width self.radial = 0 Kernel.__init__(self, dshape, kshape, norm, frozen, center, args, kwargs, do_pad, pad_mask, origin) self.origin = origin def __str__(self): ss = [ 'is_model = %s' % str(self.is_model), 'size = %s' % str(self.size), 'lo = %s' % str(self.lo), 'hi = %s' % str(self.hi), 'width = %s' % str(self.width), 'radial = %s' % str(self.radial) ] return Kernel.__str__(self) + '\n' + '\n'.join(ss)
[docs] def init_kernel(self, kernel): # If PSF dataset, normalize before kernel extraction # if not self.is_model and self.norm: if self.norm: kernel = normalize(kernel) (kernel, kshape, self.frac, lo, hi) = extract_kernel(kernel, self.kshape, self.size, self.center, self.lo, self.hi, self.width, self.radial) # If PSF model, then normalize integrated volume to 1, after # kernel extraction # if self.is_model and self.norm: # self.frac = 1.0 # kernel = normalize(kernel) # Find brightest pixel of PSF--assume that is the origin # Just assuming that the origin is half of szs1 can lead to # unwanted pixel shifts--but this assumes that origin should # be centered on brightest pixel. brightPixel = list(numpy.where(kernel == kernel.max())).pop() # if more than one pixel qualifies as brightest, such as const2D # use the middle of subkernel -- assumes the user provided center at # time of kernel extraction, so that should be middle of subkernel. origin = None if (not numpy.isscalar(brightPixel)) and len(brightPixel) != 1: origin = set_origin(kshape) else: # brightPixel is a NumPy index (int64) which - as of NumPy 1.18 # and Python 3.8 - causes a TypeError with the message # "only integer scalar arrays can be converted to a scalar index" # to be thrown here if sent directly to set_origin. So # we convert to a Python integer type. # origin = set_origin(kshape, int(brightPixel)) if self.origin is None: self.origin = origin if self.is_model and not self.frozen: # if the kernel model has thawed parameters, clear the old FFT and # recompute the kernel FFT at each model evaluation self._tcd.clear_kernel_fft() return (kernel, kshape)
[docs] def init_data(self, data): return (data, self.dshape)
[docs]class RadialProfileKernel(PSFKernel): "class for 1D radial profile PSF convolution kernels" def __init__(self, dshape, kshape, is_model=False, norm=True, frozen=True, center=None, size=None, lo=None, hi=None, width=None, args=[], kwargs={}, pad_mask=None, do_pad=False, origin=None): self.radialsize = None PSFKernel.__init__(self, dshape, kshape, is_model, norm, frozen, center, size, lo, hi, width, args, kwargs, pad_mask, do_pad, origin) self.radial = 1 def __str__(self): return (PSFKernel.__str__(self) + '\n' + 'radialsize = %s' % str(self.radialsize))
[docs] def init_data(self, data): data, dshape = PSFKernel.init_data(self, data) # NOTICE: radial profile is 1D only! if self.radialsize is None: self.radialsize = self.dshape[0] return data, dshape
[docs] def deinit(self, vals): # NOTICE: radial profile is 1D only! vals = vals[:self.radialsize] return PSFKernel.deinit(self, vals)
[docs] def convolve(self, data, dshape, kernel, kshape): origin = self.origin if self.radialsize is not None: origin = self.origin + (numpy.asarray(dshape) - numpy.asarray(self.radialsize)) return self._tcd.convolve(data, kernel, dshape, kshape, origin)
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): if self.do_pad and len(args[0]) == numpy.prod(self.dshape): self.do_pad = False data = rhs(pr, *self.args, **self.kwargs) (data, dshape) = self.init_data(data) # NOTICE: radial profile is 1D only! # old sherpa source model grid extension to zero # (radial profile add core) tail_grid = _create_tail_grid(self.args) if tail_grid is not None: tail = rhs(pr, *tail_grid, **self.kwargs) data = numpy.concatenate([tail, data]) dshape = (len(data),) if self.kernel is None or not self.frozen: kernel = lhs(pl, *self.args, **self.kwargs) (self.kernel, self.skshape) = self.init_kernel(kernel) vals = self.convolve(data, dshape, self.kernel, self.skshape) return self.deinit(vals)
def _create_tail_grid(axis_list): if len(axis_list) == 1: # non-binned axis grid = axis_list[0] # origsize = len(grid) width = grid[1] - grid[0] tail = numpy.arange(grid[0] - width, 0., -width)[::-1] return (tail,) elif len(axis_list) == 2: # binned axis gridlo, gridhi = axis_list # origsize = len(gridlo) width = (gridhi[0] - gridlo[0]) mid = (gridlo[0] + gridhi[0]) / 2. mids = numpy.arange(mid, 0., -width)[::-1] taillo = mids - width / 2. tailhi = mids + width / 2. return (taillo, tailhi) return None
[docs]class PSFModel(Model): """Convolve a model by another model or data set. Notes ----- A number of attributes are displayed as parameters, if set, but are not handled as parameters. The attributes are: kernel, size, centre, and origin. """ def __init__(self, name='psfmodel', kernel=None): self._name = name self._size = None self._origin = None self._center = None self._must_rebin = False self.radial = Parameter(name, 'radial', 0, 0, 1, hard_min=0, hard_max=1, alwaysfrozen=True) self.norm = Parameter(name, 'norm', 1, 0, 1, hard_min=0, hard_max=1, alwaysfrozen=True) self.kernel = kernel self.model = None self.data_space = None self.psf_space = None Model.__init__(self, name) def _get_center(self): if self._center is not None: if len(self._center) == 1: return self._center[0] return self._center def _set_center(self, vals): par = vals if type(vals) in (str, numpy.string_): raise PSFErr('nostr') elif type(vals) not in (list, tuple, numpy.ndarray): par = [vals] self._center = tuple(par) if par is None: self._center = None center = property(_get_center, _set_center, doc='array of size parameters') def _get_size(self): if self._size is not None: if len(self._size) == 1: return self._size[0] return self._size def _set_size(self, vals): par = vals if type(vals) in (str, numpy.string_): raise PSFErr('notstr') elif type(vals) not in (list, tuple, numpy.ndarray): par = [vals] self._size = tuple(par) if par is None: self._size = None size = property(_get_size, _set_size, doc='array of size parameters') def _get_origin(self): if self._origin is not None: if len(self._origin) == 1: return self._origin[0] return self._origin def _set_origin(self, vals): par = vals if type(vals) in (str, numpy.string_): raise PSFErr('notstr') elif type(vals) not in (list, tuple, numpy.ndarray): par = [vals] self._origin = tuple(par) if par is None: self._origin = None origin = property(_get_origin, _set_origin, doc='FFT origin') def _get_str(self): s = '' if self.kernel is not None: s += ('\n %-12s %-6s %12s' % ('%s.kernel' % self._name, 'frozen', self.kernel.name)) if self.size is not None: s += ('\n %-12s %-6s %12s %12s %12s' % ('%s.size' % self._name, 'frozen', self.size, self.size, self.size)) if self.center is not None: s += ('\n %-12s %-6s %12s %12s %12s' % ('%s.center' % self._name, 'frozen', self.center, self.center, self.center)) if self.origin is not None: s += ('\n %-12s %-6s %12s %12s %12s' % ('%s.origin' % self._name, 'frozen', self.origin, self.origin, self.origin)) for p in [self.radial, self.norm]: s += ('\n %-12s %-6s %12g %12g %12g %10s' % (p.fullname, 'frozen', p.val, p.min, p.max, p.units)) return s def __str__(self): s = self.name hfmt = '\n %-12s %-6s %12s %12s %12s %10s' s += hfmt % ('Param', 'Type', 'Value', 'Min', 'Max', 'Units') s += hfmt % ('-' * 5, '-' * 4, '-' * 5, '-' * 3, '-' * 3, '-' * 5) s += self._get_str() return s def __call__(self, model, session=None): if self.kernel is None: raise PSFErr('notset') kernel = self.kernel if isinstance(kernel, Data): kernel = numpy.asarray(kernel.get_dep()) if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) return ConvolutionModel(kernel, model, self)
[docs] def calc(self, *args, **kwargs): if self.model is None: raise PSFErr('nofold') psf_space_evaluation = self.model.calc(*args, **kwargs) if self._must_rebin: return rebin_2d(psf_space_evaluation, self.psf_space, self.data_space).ravel() else: return psf_space_evaluation
[docs] def fold(self, data): # FIXME how will we know the native dimensionality of the # raveled model without the values? kargs = {} kshape = None dshape = data.get_dims() (size, center, origin, kargs['norm'], radial) = (self.size, self.center, self.origin, bool_cast(self.norm.val), int(self.radial.val)) kargs['size'] = size kargs['center'] = center kargs['origin'] = origin kargs['is_model'] = False kargs['do_pad'] = False kargs['args'] = data.get_indep() pixel_size_comparison = self._check_pixel_size(data) if pixel_size_comparison == self.SAME_RESOLUTION: # Don't do anything special self.data_space = EvaluationSpace2D(*data.get_indep()) self._must_rebin = False elif pixel_size_comparison == self.BETTER_RESOLUTION: # Evaluate model in PSF space self.data_space = EvaluationSpace2D(*data.get_indep()) self.psf_space = PSFSpace2D(self.data_space, self, data.sky.cdelt) kargs['args'] = self.psf_space.grid dshape = self.psf_space.shape self._must_rebin = True else: # PSF has worse resolution, error out raise AttributeError("The PSF has a worse resolution than the data.") if isinstance(self.kernel, Data): kshape = self.kernel.get_dims() # (kargs['lo'], kargs['hi'], # kargs['width']) = _get_axis_info(self.kernel.get_indep(), kshape) kargs['lo'] = [1] * len(kshape) kargs['hi'] = kshape kargs['width'] = [1] * len(kshape) if center is None: kargs['center'] = [int(dim / 2.) for dim in kshape] # update center param to default self.center = kargs['center'] if size is None: kargs['size'] = kshape # update size param to default self.size = kargs['size'] else: if (self.kernel is None) or (not callable(self.kernel)): raise PSFErr('nopsf', self._name) kshape = data.get_dims() # (kargs['lo'], kargs['hi'], # kargs['width']) = _get_axis_info(kargs['args'], dshape) kargs['lo'] = [1] * len(kshape) kargs['hi'] = kshape kargs['width'] = [1] * len(kshape) if center is None: kargs['center'] = [int(dim / 2.) for dim in dshape] # update center param to default self.center = kargs['center'] if size is None: kargs['size'] = dshape # update size param to default self.size = kargs['size'] kargs['is_model'] = True if hasattr(self.kernel, 'pars'): # freeze all PSF model parameters if not already. for par in self.kernel.pars: par.freeze() if hasattr(self.kernel, 'thawedpars'): kargs['frozen'] = (len(self.kernel.thawedpars) == 0) is_kernel = (kargs['is_model'] and not kargs['norm'] and len(kshape) == 1) # Handle noticed regions for convolution if numpy.iterable(data.mask): kargs['do_pad'] = True kargs['pad_mask'] = data.mask if is_kernel: for id in ['is_model', 'lo', 'hi', 'width', 'size']: kargs.pop(id) self.model = Kernel(dshape, kshape, **kargs) return if radial: self.model = RadialProfileKernel(dshape, kshape, **kargs) return self.model = PSFKernel(dshape, kshape, **kargs) return
def _get_kernel_data(self, data, subkernel=True): self.fold(data) if self.kernel is None: raise PSFErr('notset') kernel = self.kernel dep = None indep = None lo = None hi = None if isinstance(kernel, Data): dep = numpy.asarray(kernel.get_dep()) indep = kernel.get_indep() elif callable(kernel): dep = kernel(*self.model.args, **self.model.kwargs) indep = self.model.args kshape = self.model.kshape if subkernel: (dep, newshape) = self.model.init_kernel(dep) if (numpy.array(kshape) != numpy.array(newshape)).any(): newindep = [] for axis in indep: args = extract_kernel(axis, self.model.kshape, self.model.size, self.model.center, self.model.lo, self.model.hi, self.model.width, self.model.radial) newaxis = args[0] lo = args[3] # subkernel offsets (lower bound) hi = args[4] # subkernel offsets (upper bound) newindep.append(newaxis) indep = newindep kshape = newshape if self.model.frac is not None: info('PSF frac: %s' % self.model.frac) if numpy.isscalar(kshape): kshape = [kshape] return (indep, dep, kshape, lo, hi)
[docs] def get_kernel(self, data, subkernel=True): indep, dep, kshape, lo, hi = self._get_kernel_data(data, subkernel) ndim = len(kshape) if ndim == 1: dataset = Data1D('kernel', indep[0], dep) elif ndim == 2: dataset = Data2D('kernel', indep[0], indep[1], dep, kshape[::-1]) else: raise PSFErr('ndim') return dataset
def _check_pixel_size(self, data): """ If the data and PSF dot not have WCS information, assume the PSF has the same resolution as the image. Otherwise check the WCS information to determine the relative resolutions. We only check the resolution in one dimention and assume they are the same """ if hasattr(self.kernel, "sky"): # This corresponds to the case when the kernel is actually a psf image, not just a model. try: psf_pixel_size = self.kernel.sky.cdelt except AttributeError: # If the kernel does not have a pixel size, issue a warning and keep going warnings.warn("PSF Image does not have a pixel size. Sherpa will assume " "the pixel size is the same as the data") return self.SAME_RESOLUTION try: data_pixel_size = data.sky.cdelt except AttributeError: warnings.warn("Data Image does not have a pixel size. Sherpa will assume " "the pixel size is the same as the PSF") return self.SAME_RESOLUTION if numpy.allclose(psf_pixel_size, data_pixel_size): return self.SAME_RESOLUTION if psf_pixel_size[0] < data_pixel_size[0]: return self.BETTER_RESOLUTION if psf_pixel_size[0] > data_pixel_size[0]: return self.WORSE_RESOLUTION return self.SAME_RESOLUTION SAME_RESOLUTION = 0 BETTER_RESOLUTION = 1 WORSE_RESOLUTION = -1
class PSFSpace2D(EvaluationSpace2D): """ This class defines a special evaluation space that has the same boundaries as the data space but a number of pixels consistent with the pixel size of the PSF. This space is used when the data image and the PSF have a different pixel size so the model is evaluated on the "PSF space" and then rebinned back to the data space for the calculation of the statistic during a fit. """ def __init__(self, data_space, psf_model, data_pixel_size=None): if data_pixel_size is None: data_pixel_size = [1, 1] x_start, y_start = data_space.start x_end, y_end = data_space.end psf_pixel_size_axis0 = psf_model.kernel.sky.cdelt[0] psf_pixel_size_axis1 = psf_model.kernel.sky.cdelt[1] data_pixel_size_axis0 = data_pixel_size[0] data_pixel_size_axis1 = data_pixel_size[1] step_x = psf_pixel_size_axis0 / data_pixel_size_axis0 step_y = psf_pixel_size_axis1 / data_pixel_size_axis1 x_range_end, y_range_end = x_end + 1, y_end + 1 x = numpy.arange(x_start, x_range_end, step_x) y = numpy.arange(y_start, y_range_end, step_y) self.data_2_psf_pixel_size_ratio = (step_x, step_y) super(PSFSpace2D, self).__init__(x, y)