Source code for sherpa.instrument

#
#  Copyright (C) 2008, 2016, 2018  Smithsonian Astrophysical Observatory
#
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import warnings

import numpy
import logging

from six import string_types

from sherpa.data import Data, Data1D, Data2D
from sherpa.models import ArithmeticModel, ArithmeticConstantModel, \
    ArithmeticFunctionModel, CompositeModel, Model
from sherpa.models.parameter import Parameter
from sherpa.utils import bool_cast, NoNewAttributesAfterInit
from sherpa.utils.err import PSFErr
from sherpa.utils._psf import extract_kernel, get_padsize, normalize, \
    pad_data, set_origin, tcdData, unpad_data

import sherpa
info = logging.getLogger(__name__).info


__all__ = ('Kernel', 'PSFKernel', 'RadialProfileKernel', 'PSFModel',
           'ConvolutionModel')


[docs]class ConvolutionModel(CompositeModel, ArithmeticModel):
[docs] @staticmethod def wrapobj(obj): if isinstance(obj, ArithmeticModel): return obj return ArithmeticFunctionModel(obj)
[docs] @staticmethod def wrapkern(obj): if isinstance(obj, ArithmeticModel): return obj elif callable(obj): return ArithmeticFunctionModel(obj) return ArithmeticConstantModel(obj, 'kernel')
def __init__(self, lhs, rhs, psf): self.lhs = self.wrapkern(lhs) self.rhs = self.wrapobj(rhs) self.psf = psf CompositeModel.__init__(self, ('%s(%s)' % (self.psf.name, self.rhs.name)), (self.psf, self.lhs, self.rhs))
[docs] def calc(self, p, *args, **kwargs): nlhs = len(self.lhs.pars) return self.psf.calc(p[:nlhs], p[nlhs:], self.lhs.calc, self.rhs.calc, *args, **kwargs)
[docs]class Kernel(NoNewAttributesAfterInit): "Base class for convolution kernels" def __init__(self, dshape, kshape, norm=False, frozen=True, center=None, args=[], kwargs={}, do_pad=False, pad_mask=None, origin=None): if origin is None: origin = numpy.zeros(len(kshape)) self.dshape = dshape self.kshape = kshape self.kernel = None self.skshape = None self.norm = norm self.origin = origin self.frozen = frozen self.center = center self.args = args self.kwargs = kwargs self.renorm_shape = None self.renorm = None self.do_pad = do_pad self.pad_mask = pad_mask self.frac = None self._tcd = tcdData() NoNewAttributesAfterInit.__init__(self) def __setstate__(self, state): state['_tcd'] = tcdData() self.__dict__.update(state) def __getstate__(self): state = self.__dict__.copy() state.pop('_tcd') return state def __repr__(self): return "<%s kernel instance>" % type(self).__name__ def __str__(self): ss = [ 'dshape = %s' % str(self.dshape), 'kshape = %s' % str(self.kshape), # 'kernel = %s' % type(self.kernel).__name__, 'skshape = %s' % str(self.skshape), 'norm = %s' % str(self.norm), 'origin = %s' % str(self.origin), 'frozen = %s' % str(self.frozen), 'center = %s' % str(self.center), 'args = %s' % str(self.args), 'kwargs = %s' % str(self.kwargs), 'renorm_shape = %s' % str(self.renorm_shape), 'renorm = %s' % str(self.renorm), 'do_pad = %s' % str(self.do_pad), 'pad_mask = %s' % str(self.pad_mask), 'frac = %s' % str(self.frac) ] return '\n'.join(ss)
[docs] def init_kernel(self, kernel): if not self.frozen: self._tcd.clear_kernel_fft() renorm_shape = [] for axis in self.dshape: renorm_shape.append(get_padsize(2 * axis)) self.renorm_shape = tuple(renorm_shape) kernpad = pad_data(kernel, self.dshape, self.renorm_shape) self.renorm = self._tcd.convolve(numpy.ones(len(kernel)), kernpad, self.dshape, renorm_shape, self.origin) self.renorm = unpad_data(self.renorm, renorm_shape, self.dshape) return (kernel, self.dshape)
[docs] def init_data(self, data): if self.renorm_shape is None: renorm_shape = [] for axis in self.dshape: renorm_shape.append(get_padsize(2 * axis)) self.renorm_shape = tuple(renorm_shape) # pad the data and convolve with unpadded kernel datapad = pad_data(data, self.dshape, self.renorm_shape) return (datapad, self.renorm_shape)
[docs] def deinit(self, vals): if self.renorm is not None: vals = unpad_data(vals, self.renorm_shape, self.dshape) vals = vals / self.renorm if self.do_pad: vals = vals[self.pad_mask] return vals
[docs] def convolve(self, data, dshape, kernel, kshape): return self._tcd.convolve(data, kernel, dshape, kshape, self.origin)
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): if self.do_pad and len(args[0]) == numpy.prod(self.dshape): self.do_pad = False data = rhs(pr, *self.args, **self.kwargs) (data, dshape) = self.init_data(data) if self.kernel is None or not self.frozen: kernel = lhs(pl, *self.args, **self.kwargs) (self.kernel, self.skshape) = self.init_kernel(kernel) vals = self.convolve(data, dshape, self.kernel, self.skshape) return self.deinit(vals)
[docs]class ConvolutionKernel(Model): def __init__(self, kernel, name='conv'): self.kernel = kernel self.name = name self._tcd = tcdData() Model.__init__(self, name) def __setstate__(self, state): state['_tcd'] = tcdData() self.__dict__.update(state) def __getstate__(self): state = self.__dict__.copy() state.pop('_tcd') return state def __repr__(self): return "<%s kernel instance>" % type(self).__name__ def __str__(self): if self.kernel is None: raise PSFErr('notset') return "Convolution Kernel:\n" + self.kernel.__str__() def __call__(self, model, session=None): if self.kernel is None: raise PSFErr('notset') kernel = self.kernel if isinstance(kernel, Data): kernel = numpy.asarray(kernel.get_dep()) if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) return ConvolutionModel(kernel, model, self)
[docs] def set_kernel(self, kernel): self.kernel = kernel
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): self._tcd.clear_kernel_fft() data = numpy.asarray(rhs(pr, *args, **kwargs)) kern = numpy.asarray(lhs(pl, *args, **kwargs)) size = data.size return self._tcd.convolve(data, kern, size, kern.size, int(size / 2))[:size]
[docs]class PSFKernel(Kernel): "class for PSF convolution kernels" def __init__(self, dshape, kshape, is_model=False, norm=True, frozen=True, center=None, size=None, lo=None, hi=None, width=None, args=[], kwargs={}, pad_mask=None, do_pad=False, origin=None): self.is_model = is_model self.size = size self.lo = lo self.hi = hi self.width = width self.radial = 0 Kernel.__init__(self, dshape, kshape, norm, frozen, center, args, kwargs, do_pad, pad_mask, origin) self.origin = origin def __str__(self): ss = [ 'is_model = %s' % str(self.is_model), 'size = %s' % str(self.size), 'lo = %s' % str(self.lo), 'hi = %s' % str(self.hi), 'width = %s' % str(self.width), 'radial = %s' % str(self.radial) ] return Kernel.__str__(self) + '\n' + '\n'.join(ss)
[docs] def init_kernel(self, kernel): # If PSF dataset, normalize before kernel extraction # if not self.is_model and self.norm: if self.norm: kernel = normalize(kernel) (kernel, kshape, self.frac, lo, hi) = extract_kernel(kernel, self.kshape, self.size, self.center, self.lo, self.hi, self.width, self.radial) # If PSF model, then normalize integrated volume to 1, after # kernel extraction # if self.is_model and self.norm: # self.frac = 1.0 # kernel = normalize(kernel) # Find brightest pixel of PSF--assume that is the origin # Just assuming that the origin is half of szs1 can lead to # unwanted pixel shifts--but this assumes that origin should # be centered on brightest pixel. brightPixel = list(numpy.where(kernel == kernel.max())).pop() origin = None # if more than one pixel qualifies as brightest, such as const2D # use the middle of subkernel -- assumes the user provided center at # time of kernel extraction, so that should be middle of subkernel. if (not numpy.isscalar(brightPixel)) and len(brightPixel) != 1: origin = set_origin(kshape) else: origin = set_origin(kshape, brightPixel) if self.origin is None: self.origin = origin if self.is_model and not self.frozen: # if the kernel model has thawed parameters, clear the old FFT and # recompute the kernel FFT at each model evaluation self._tcd.clear_kernel_fft() return (kernel, kshape)
[docs] def init_data(self, data): return (data, self.dshape)
[docs]class RadialProfileKernel(PSFKernel): "class for 1D radial profile PSF convolution kernels" def __init__(self, dshape, kshape, is_model=False, norm=True, frozen=True, center=None, size=None, lo=None, hi=None, width=None, args=[], kwargs={}, pad_mask=None, do_pad=False, origin=None): self.radialsize = None PSFKernel.__init__(self, dshape, kshape, is_model, norm, frozen, center, size, lo, hi, width, args, kwargs, pad_mask, do_pad, origin) self.radial = 1 def __str__(self): return (PSFKernel.__str__(self) + '\n' + 'radialsize = %s' % str(self.radialsize))
[docs] def init_data(self, data): data, dshape = PSFKernel.init_data(self, data) # NOTICE: radial profile is 1D only! if self.radialsize is None: self.radialsize = self.dshape[0] return data, dshape
[docs] def deinit(self, vals): # NOTICE: radial profile is 1D only! vals = vals[:self.radialsize] return PSFKernel.deinit(self, vals)
[docs] def convolve(self, data, dshape, kernel, kshape): origin = self.origin if self.radialsize is not None: origin = self.origin + (numpy.asarray(dshape) - numpy.asarray(self.radialsize)) return self._tcd.convolve(data, kernel, dshape, kshape, origin)
[docs] def calc(self, pl, pr, lhs, rhs, *args, **kwargs): if self.do_pad and len(args[0]) == numpy.prod(self.dshape): self.do_pad = False data = rhs(pr, *self.args, **self.kwargs) (data, dshape) = self.init_data(data) # NOTICE: radial profile is 1D only! # old sherpa source model grid extension to zero # (radial profile add core) tail_grid = _create_tail_grid(self.args) if tail_grid is not None: tail = rhs(pr, *tail_grid, **self.kwargs) data = numpy.concatenate([tail, data]) dshape = (len(data),) if self.kernel is None or not self.frozen: kernel = lhs(pl, *self.args, **self.kwargs) (self.kernel, self.skshape) = self.init_kernel(kernel) vals = self.convolve(data, dshape, self.kernel, self.skshape) return self.deinit(vals)
def _create_tail_grid(axis_list): if len(axis_list) == 1: # non-binned axis grid = axis_list[0] # origsize = len(grid) width = grid[1] - grid[0] tail = numpy.arange(grid[0] - width, 0., -width)[::-1] return (tail,) elif len(axis_list) == 2: # binned axis gridlo, gridhi = axis_list # origsize = len(gridlo) width = (gridhi[0] - gridlo[0]) mid = (gridlo[0] + gridhi[0]) / 2. mids = numpy.arange(mid, 0., -width)[::-1] taillo = mids - width / 2. tailhi = mids + width / 2. return (taillo, tailhi) return None def _get_axis_info(axis_list, dims): if len(dims) == 1 and len(axis_list) == 1: xlo = axis_list[0] lo = xlo.min() hi = xlo.max() width = xlo[1] - xlo[0] return ((lo,), (hi,), (width,)) elif len(dims) == 1 and len(axis_list) == 2: # 1D integrated grid # bin_width = data.get_xerr()[0] bin_width = axis_list[1] - axis_list[0] return ((axis_list[0].min(),), (axis_list[1].max(),), (bin_width[0],)) elif len(dims) == 2 and len(axis_list) == 2: # use the unfiltered data grid to obtain lo, hi, width x0, x1 = axis_list lo = (x0.min(), x1.min()) hi = (x0.max(), x1.max()) # If 2D, and we are on the 2nd axis, then the next pertinent # value of x is not 1 bin away, but (original size of axis # 1) away # we are using get_dims() for shape width = ((x0[1] - x0[0]), (x1[dims[0]] - x1[0])) return (lo, hi, width) return None, None, None
[docs]class PSFModel(Model): def _get_center(self): if self._center is not None: if len(self._center) == 1: return self._center[0] return self._center def _set_center(self, vals): par = vals if type(vals) in (str, numpy.string_): raise PSFErr('nostr') elif type(vals) not in (list, tuple, numpy.ndarray): par = [vals] self._center = tuple(par) if par is None: self._center = None center = property(_get_center, _set_center, doc='array of size parameters') def _get_size(self): if self._size is not None: if len(self._size) == 1: return self._size[0] return self._size def _set_size(self, vals): par = vals if type(vals) in (str, numpy.string_): raise PSFErr('notstr') elif type(vals) not in (list, tuple, numpy.ndarray): par = [vals] self._size = tuple(par) if par is None: self._size = None size = property(_get_size, _set_size, doc='array of size parameters') def _get_origin(self): if self._origin is not None: if len(self._origin) == 1: return self._origin[0] return self._origin def _set_origin(self, vals): par = vals if type(vals) in (str, numpy.string_): raise PSFErr('notstr') elif type(vals) not in (list, tuple, numpy.ndarray): par = [vals] self._origin = tuple(par) if par is None: self._origin = None origin = property(_get_origin, _set_origin, doc='FFT origin') def __init__(self, name='psfmodel', kernel=None): self._name = name self._size = None self._origin = None self._center = None self.radial = Parameter(name, 'radial', 0, 0, 1, hard_min=0, hard_max=1, alwaysfrozen=True) self.norm = Parameter(name, 'norm', 1, 0, 1, hard_min=0, hard_max=1, alwaysfrozen=True) self.kernel = kernel self.model = None Model.__init__(self, name) def _get_str(self): s = '' if self.kernel is not None: s += ('\n %-12s %-6s %12s' % ('%s.kernel' % self._name, 'frozen', self.kernel.name)) if self.size is not None: s += ('\n %-12s %-6s %12s %12s %12s' % ('%s.size' % self._name, 'frozen', self.size, self.size, self.size)) if self.center is not None: s += ('\n %-12s %-6s %12s %12s %12s' % ('%s.center' % self._name, 'frozen', self.center, self.center, self.center)) if self.origin is not None: s += ('\n %-12s %-6s %12s %12s %12s' % ('%s.origin' % self._name, 'frozen', self.origin, self.origin, self.origin)) for p in [self.radial, self.norm]: s += ('\n %-12s %-6s %12g %12g %12g %10s' % (p.fullname, 'frozen', p.val, p.min, p.max, p.units)) return s def __str__(self): s = self.name hfmt = '\n %-12s %-6s %12s %12s %12s %10s' s += hfmt % ('Param', 'Type', 'Value', 'Min', 'Max', 'Units') s += hfmt % ('-' * 5, '-' * 4, '-' * 5, '-' * 3, '-' * 3, '-' * 5) s += self._get_str() return s def __call__(self, model, session=None): if self.kernel is None: raise PSFErr('notset') kernel = self.kernel if isinstance(kernel, Data): kernel = numpy.asarray(kernel.get_dep()) if isinstance(model, string_types): if session is None: model = sherpa.astro.ui._session._eval_model_expression(model) else: model = session._eval_model_expression(model) return ConvolutionModel(kernel, model, self)
[docs] def calc(self, *args, **kwargs): if self.model is None: raise PSFErr('nofold') return self.model.calc(*args, **kwargs)
[docs] def fold(self, data): # FIXME how will we know the native dimensionality of the # raveled model without the values? self._check_pixel_size(data) kargs = {} kshape = None dshape = data.get_dims() (size, center, origin, kargs['norm'], radial) = (self.size, self.center, self.origin, bool_cast(self.norm.val), int(self.radial.val)) kargs['size'] = size kargs['center'] = center kargs['origin'] = origin kargs['is_model'] = False kargs['do_pad'] = False kargs['args'] = data.get_indep() if isinstance(self.kernel, Data): kshape = self.kernel.get_dims() # (kargs['lo'], kargs['hi'], # kargs['width']) = _get_axis_info(self.kernel.get_indep(), kshape) kargs['lo'] = [1] * len(kshape) kargs['hi'] = kshape kargs['width'] = [1] * len(kshape) if center is None: kargs['center'] = [int(dim / 2.) for dim in kshape] # update center param to default self.center = kargs['center'] if size is None: kargs['size'] = kshape # update size param to default self.size = kargs['size'] else: if (self.kernel is None) or (not callable(self.kernel)): raise PSFErr('nopsf', self._name) kshape = data.get_dims() # (kargs['lo'], kargs['hi'], # kargs['width']) = _get_axis_info(kargs['args'], dshape) kargs['lo'] = [1] * len(kshape) kargs['hi'] = kshape kargs['width'] = [1] * len(kshape) if center is None: kargs['center'] = [int(dim / 2.) for dim in dshape] # update center param to default self.center = kargs['center'] if size is None: kargs['size'] = dshape # update size param to default self.size = kargs['size'] kargs['is_model'] = True if hasattr(self.kernel, 'pars'): # freeze all PSF model parameters if not already. for par in self.kernel.pars: par.freeze() if hasattr(self.kernel, 'thawedpars'): kargs['frozen'] = (len(self.kernel.thawedpars) == 0) # check size of self.size to ensure <= dshape for 2D # if len(dshape) > 1: # dsize = numpy.asarray(dshape) # ksize = numpy.asarray(self.size) # if True in (ksize>dsize): # raise PSFErr('badsize', ksize, dsize) is_kernel = (kargs['is_model'] and not kargs['norm'] and len(kshape) == 1) # Handle noticed regions for convolution if numpy.iterable(data.mask): kargs['do_pad'] = True kargs['pad_mask'] = data.mask if is_kernel: for id in ['is_model', 'lo', 'hi', 'width', 'size']: kargs.pop(id) self.model = Kernel(dshape, kshape, **kargs) return if radial: self.model = RadialProfileKernel(dshape, kshape, **kargs) return self.model = PSFKernel(dshape, kshape, **kargs) return
def _get_kernel_data(self, data, subkernel=True): self.fold(data) if self.kernel is None: raise PSFErr('notset') kernel = self.kernel dep = None indep = None lo = None hi = None if isinstance(kernel, Data): dep = numpy.asarray(kernel.get_dep()) indep = kernel.get_indep() elif callable(kernel): dep = kernel(*self.model.args, **self.model.kwargs) indep = self.model.args kshape = self.model.kshape if subkernel: (dep, newshape) = self.model.init_kernel(dep) if (numpy.array(kshape) != numpy.array(newshape)).any(): newindep = [] for axis in indep: args = extract_kernel(axis, self.model.kshape, self.model.size, self.model.center, self.model.lo, self.model.hi, self.model.width, self.model.radial) newaxis = args[0] lo = args[3] # subkernel offsets (lower bound) hi = args[4] # subkernel offsets (upper bound) newindep.append(newaxis) indep = newindep kshape = newshape if self.model.frac is not None: info('PSF frac: %s' % self.model.frac) if numpy.isscalar(kshape): kshape = [kshape] return (indep, dep, kshape, lo, hi)
[docs] def get_kernel(self, data, subkernel=True): indep, dep, kshape, lo, hi = self._get_kernel_data(data, subkernel) ndim = len(kshape) if ndim == 1: dataset = Data1D('kernel', indep[0], dep) elif ndim == 2: dataset = Data2D('kernel', indep[0], indep[1], dep, kshape[::-1]) else: raise PSFErr('ndim') return dataset
def _check_pixel_size(self, data): if hasattr(self.kernel, "sky"): # This corresponds to the case when the kernel is actually a psf image, not just a model. psf_pixel_size = self.kernel.sky.cdelt data_pixel_size = data.sky.cdelt if not numpy.allclose(psf_pixel_size, data_pixel_size): warnings.warn("NOTE: The PSF pixel size ({}) does not correspond to the Image Pixel Size ({})".format( psf_pixel_size, data_pixel_size ))