#
# Copyright (C) 2008, 2015, 2016, 2017 Smithsonian Astrophysical Observatory
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
"""
Tools for creating, storing, inspecting, and manipulating data sets
"""
import warnings
from six.moves import zip as izip
import sys
import inspect
import numpy
from sherpa.models.regrid import EvaluationSpace1D
from sherpa.utils.err import DataErr, NotImplementedErr
from sherpa.utils import SherpaFloat, NoNewAttributesAfterInit, \
print_fields, create_expr, calc_total_error, bool_cast, \
filter_bins
__all__ = ('Data', 'DataSimulFit', 'Data1D', 'Data1DInt', 'Data2D',
'Data2DInt')
[docs]class BaseData(NoNewAttributesAfterInit):
"Base class for all data set types"
def _get_filter(self):
return self._filter
def _set_filter(self, val):
self._filter = val
self._mask = True
filter = property(_get_filter, _set_filter,
doc='Filter for dependent variable')
def _get_mask(self):
return self._mask
def _set_mask(self, val):
if (val is True) or (val is False):
self._mask = val
elif (val is None) or numpy.isscalar(val):
raise DataErr('ismask')
else:
self._mask = numpy.asarray(val, numpy.bool_)
self._filter = None
mask = property(_get_mask, _set_mask,
doc='Mask array for dependent variable')
def __init__(self):
"""Initialize a data object. This method can only be called from
a derived class constructor. Attempts to create a BaseData
instance will raise NotImplementedErr.
Derived class constructors must call this method directly (and
not indirectly through a superclass constructor). When thus
invoked, this method will extract the argument names and
values from the derived class constructor invocation and set
corresponding attributes on the instance (thereby eliminating
the need for the derived class constructor to do its own
attribute setting). If the name of an argument matches the
name of a DataProperty of the derived class, then the
corresponding attribute name will have an underscore prepended
(meaning the property will use the value directly instead of
relying on _get_*/_set_* methods).
"""
if type(self) is BaseData:
raise NotImplementedErr('noinstanceallowed', 'BaseData')
frame = sys._getframe().f_back
cond = (frame.f_code is self.__init__.__func__.__code__)
assert cond, (('%s constructor must call BaseData constructor ' +
'directly') % type(self).__name__)
args = inspect.getargvalues(frame)
self._fields = tuple(args[0][1:])
for f in self._fields:
cond = (f not in vars(self))
assert cond, (("'%s' object already has attribute '%s'") %
(type(self).__name__, f))
setattr(self, f, args[3][f])
self.filter = None
self.mask = True
NoNewAttributesAfterInit.__init__(self)
def __str__(self):
"""
Return a listing of the attributes listed in self._fields and,
if present, self._extra_fields.
"""
fields = self._fields + getattr(self, '_extra_fields', ())
fdict = dict(izip(fields, [getattr(self, f) for f in fields]))
return print_fields(fields, fdict)
[docs] def apply_filter(self, data):
if data is not None:
if self.filter is not None:
if callable(self.filter):
data = self.filter(data)
else:
data = data[self.filter]
elif self.mask is not True:
if self.mask is False:
raise DataErr('notmask')
data = numpy.asarray(data)
if data.shape != self.mask.shape:
raise DataErr('mismatch', 'mask', 'data array')
data = data[self.mask]
return data
[docs] def ignore(self, *args, **kwargs):
kwargs['ignore'] = True
self.notice(*args, **kwargs)
[docs] def notice(self, mins, maxes, axislist, ignore=False):
ignore = bool_cast(ignore)
if str in [type(min) for min in mins]:
raise DataErr('typecheck', 'lower bound')
elif str in [type(max) for max in maxes]:
raise DataErr('typecheck', 'upper bound')
elif str in [type(axis) for axis in axislist]:
raise DataErr('typecheck', 'grid')
mask = filter_bins(mins, maxes, axislist)
if mask is None:
self.mask = not ignore
elif not ignore:
if self.mask is True:
self.mask = mask
else:
self.mask |= mask
else:
mask = ~mask
if self.mask is False:
self.mask = mask
else:
self.mask &= mask
[docs]class Data(BaseData):
"Generic data set"
def __init__(self, name, indep, dep, staterror=None, syserror=None):
"""
Initialize a Data instance. indep should be a tuple of
independent axis arrays, dep should be an array of dependent
variable values, and staterror and syserror should be arrays
of statistical and systematic errors, respectively, in the
dependent variable (or None).
"""
BaseData.__init__(self)
def __repr__(self):
r = '<%s data set instance' % type(self).__name__
if hasattr(self, 'name'):
r += " '%s'" % self.name
r += '>'
return r
[docs] def eval_model(self, modelfunc):
return modelfunc(*self.get_indep())
[docs] def eval_model_to_fit(self, modelfunc):
return modelfunc(*self.get_indep(filter=True, model=modelfunc))
#
# Primary properties. These can depend only on normal attributes (and not
# other properties).
#
[docs] def get_indep(self, filter=False, model=None):
"""Return the independent axes of a data set.
Parameters
----------
filter : bool, optional
Should the filter attached to the data set be applied to
the return value or not. The default is `False`.
Returns
-------
axis: tuple of arrays
The independent axis values for the data set. This gives
the coordinates of each point in the data set.
See Also
--------
get_dep : Return the dependent axis of a data set.
"""
indep = getattr(self, 'indep', None)
filter = bool_cast(filter)
if filter:
indep = tuple([self.apply_filter(x) for x in indep])
return indep
[docs] def get_dep(self, filter=False):
"""Return the dependent axis of a data set.
Parameters
----------
filter : bool, optional
Should the filter attached to the data set be applied to
the return value or not. The default is `False`.
Returns
-------
axis: array
The dependent axis values for the data set. This gives
the value of each point in the data set.
See Also
--------
get_indep : Return the independent axis of a data set.
get_error : Return the errors on the dependent axis of a data set.
get_staterror : Return the statistical errors on the dependent axis of a data set.
get_syserror : Return the systematic errors on the dependent axis of a data set.
"""
dep = getattr(self, 'dep', None)
filter = bool_cast(filter)
if filter:
dep = self.apply_filter(dep)
return dep
[docs] def get_staterror(self, filter=False, staterrfunc=None):
"""Return the statistical error on the dependent axis of a data set.
Parameters
----------
filter : bool, optional
Should the filter attached to the data set be applied to
the return value or not. The default is `False`.
staterrfunc : function
If no statistical error has been set, the errors will
be calculated by applying this function to the
dependent axis of the data set.
Returns
-------
axis : array or `None`
The statistical error for each data point. A value of
`None` is returned if the data set has no statistical error
array and `staterrfunc` is `None`.
See Also
--------
get_error : Return the errors on the dependent axis of a data set.
get_indep : Return the independent axis of a data set.
get_syserror : Return the systematic errors on the dependent axis of a data set.
"""
staterror = getattr(self, 'staterror', None)
filter = bool_cast(filter)
if filter:
staterror = self.apply_filter(staterror)
if (staterror is None) and (staterrfunc is not None):
dep = self.get_dep()
if filter:
dep = self.apply_filter(dep)
staterror = staterrfunc(dep)
return staterror
[docs] def get_syserror(self, filter=False):
"""Return the statistical error on the dependent axis of a data set.
Parameters
----------
filter : bool, optional
Should the filter attached to the data set be applied to
the return value or not. The default is `False`.
Returns
-------
axis : array or `None`
The systematic error for each data point. A value of
`None` is returned if the data set has no systematic
errors.
See Also
--------
get_error : Return the errors on the dependent axis of a data set.
get_indep : Return the independent axis of a data set.
get_staterror : Return the statistical errors on the dependent axis of a data set.
"""
syserr = getattr(self, 'syserror', None)
filter = bool_cast(filter)
if filter:
syserr = self.apply_filter(syserr)
return syserr
#
# Utility methods
#
def _wrong_dim_error(self, baddim):
raise DataErr('wrongdim', self.name, baddim)
def _no_image_error(self):
raise DataErr('notimage', self.name)
def _no_dim_error(self):
raise DataErr('nodim', self.name)
#
# Secondary properties. To best support subclasses, these should depend
# only on the primary properties whenever possible, though there may be
# instances when they depend on normal attributes.
#
[docs] def get_dims(self):
self._no_dim_error()
[docs] def get_error(self, filter=False, staterrfunc=None):
"""Return the total error on the dependent variable.
Parameters
----------
filter : bool, optional
Should the filter attached to the data set be applied to
the return value or not. The default is `False`.
staterrfunc : function
If no statistical error has been set, the errors will
be calculated by applying this function to the
dependent axis of the data set.
Returns
-------
axis : array or `None`
The error for each data point, formed by adding the
statistical and systematic errors in quadrature.
See Also
--------
get_dep : Return the independent axis of a data set.
get_staterror : Return the statistical errors on the dependent axis of a data set.
get_syserror : Return the systematic errors on the dependent axis of a data set.
"""
return calc_total_error(self.get_staterror(filter, staterrfunc),
self.get_syserror(filter))
[docs] def get_x(self, filter=False, yfunc=None, use_evaluation_space=False):
"Return linear view of independent axis/axes"
self._wrong_dim_error(1)
[docs] def get_xerr(self, filter=False, yfunc=None):
"Return linear view of bin size in independent axis/axes"
return None
[docs] def get_xlabel(self):
"Return label for linear view of independent axis/axes"
return 'x'
[docs] def get_y(self, filter=False, yfunc=None, use_evaluation_space=False):
"Return dependent axis in N-D view of dependent variable"
y = self.get_dep(filter)
if yfunc is not None:
if filter:
yfunc = self.eval_model_to_fit(yfunc)
else:
yfunc = self.eval_model(yfunc)
y = (y, yfunc)
return y
[docs] def get_yerr(self, filter=False, staterrfunc=None):
"Return errors in dependent axis in N-D view of dependent variable"
return self.get_error(filter, staterrfunc)
[docs] def get_ylabel(self, yfunc=None):
"Return label for dependent axis in N-D view of dependent variable"
return 'y'
[docs] def get_x0(self, filter=False):
"Return first dimension in 2-D view of independent axis/axes"
self._wrong_dim_error(2)
[docs] def get_x0label(self):
"Return label for first dimension in 2-D view of independent axis/axes"
return 'x0'
[docs] def get_x1(self, filter=False):
"Return second dimension in 2-D view of independent axis/axes"
self._wrong_dim_error(2)
[docs] def get_x1label(self):
"""
Return label for second dimension in 2-D view of independent axis/axes
"""
return 'x1'
# For images, only need y-array
# Also, we do not filter, as imager needs M x N (or
# L x M x N) array
[docs] def get_img(self, yfunc=None):
"Return dependent variable as an image"
self._no_image_error()
[docs] def get_imgerr(self, yfunc=None):
"Return total error in dependent variable as an image"
self._no_image_error()
[docs] def to_guess(self):
arrays = [self.get_y(True)]
arrays.extend(self.get_indep(True))
return tuple(arrays)
[docs] def to_fit(self, staterrfunc=None):
return (self.get_dep(True),
self.get_staterror(True, staterrfunc),
self.get_syserror(True))
[docs] def to_plot(self, yfunc=None, staterrfunc=None):
# As we introduced models defined on arbitrary grids, the x array can also depend on the
# model function, at least in principle.
return (self.get_x(True, yfunc),
self.get_y(True, yfunc),
self.get_yerr(True, staterrfunc),
self.get_xerr(True, yfunc),
self.get_xlabel(),
self.get_ylabel())
[docs] def to_component_plot(self, yfunc=None, staterrfunc=None):
# As we introduced models defined on arbitrary grids, the x array can also depend on the
# model function, at least in principle.
return (self.get_x(True, yfunc, use_evaluation_space=True),
self.get_y(True, yfunc, use_evaluation_space=True),
self.get_yerr(True, staterrfunc),
self.get_xerr(True, yfunc),
self.get_xlabel(),
self.get_ylabel())
[docs] def to_contour(self, yfunc=None):
return (self.get_x0(True),
self.get_x1(True),
self.get_y(True, yfunc),
self.get_x0label(),
self.get_x1label())
[docs]class DataSimulFit(Data):
"""Store multiple data sets.
This class lets multiple data sets be treated as a single
dataset by concatenation. That is, if two data sets have lengths
n1 and n2 then they can be considered as a single data set of
length n1 + n2.
Parameters
----------
name : str
The name for the collection of data.
datasets : sequence of Data objects
The datasets to be stored; there must be at least one. They are
assumed to behave as sherpa.data.Data objects, but there is no
check for this condition.
Attributes
----------
datasets : sequence of Data
See Also
--------
sherpa.models.model.SimulFitModel
Examples
--------
>>> d1 = Data1D('d1', [1, 2, 3], [10, 12, 15])
>>> d2 = Data1D('d2', [1, 2, 5, 7], [4, 15, 9, 24])
>>> dall = DataSimulFit('comb', (d1, d2))
>>> yvals, _, _ = dall.to_fit()
>>> print(yvals)
[10 12 15 4 15 9 24]
"""
def __init__(self, name, datasets):
if len(datasets) == 0:
raise DataErr('zerodatasimulfit', type(self).__name__)
datasets = tuple(datasets)
BaseData.__init__(self)
[docs] def eval_model_to_fit(self, modelfuncs):
total_model = []
for func, data in izip(modelfuncs, self.datasets):
total_model.append(data.eval_model_to_fit(func))
return numpy.concatenate(total_model)
[docs] def to_fit(self, staterrfunc=None):
total_dep = []
total_staterror = []
total_syserror = []
no_staterror = True
no_syserror = True
for data in self.datasets:
dep, staterror, syserror = data.to_fit(staterrfunc)
total_dep.append(dep)
if staterror is not None:
no_staterror = False
total_staterror.append(staterror)
if syserror is not None:
no_syserror = False
else:
syserror = numpy.zeros_like(dep)
total_syserror.append(syserror)
total_dep = numpy.concatenate(total_dep)
if no_staterror:
total_staterror = None
elif numpy.any([numpy.equal(array, None).any()
for array in total_staterror]):
raise DataErr('staterrsimulfit')
else:
total_staterror = numpy.concatenate(total_staterror)
if no_syserror:
total_syserror = None
else:
total_syserror = numpy.concatenate(total_syserror)
return (total_dep, total_staterror, total_syserror)
[docs] def to_plot(self, yfunc=None, staterrfunc=None):
return self.datasets[0].to_plot(yfunc.parts[0], staterrfunc)
[docs]class DataND(Data):
"Base class for Data1D, Data2D, etc."
[docs] def get_dep(self, filter=False):
y = self.y
filter = bool_cast(filter)
if filter:
y = self.apply_filter(y)
return y
[docs] def set_dep(self, val):
"Set the dependent variable values"
dep = None
if numpy.iterable(val):
dep = numpy.asarray(val, SherpaFloat)
else:
val = SherpaFloat(val)
dep = numpy.array([val] * len(self.get_indep()[0]))
setattr(self, 'y', dep)
[docs]class Data1D(DataND):
"1-D data set"
def _set_mask(self, val):
DataND._set_mask(self, val)
try:
self._x = self.apply_filter(self.x)
except DataErr:
self._x = self.x
mask = property(DataND._get_mask, _set_mask,
doc='Mask array for dependent variable')
def __init__(self, name, x, y, staterror=None, syserror=None):
self._x = x
BaseData.__init__(self)
[docs] def get_indep(self, filter=False, model=None):
data_space = self._get_data_space(filter)
return self._get_indep_grid(data_space)
[docs] def get_x(self, filter=False, model=None, use_evaluation_space=False):
return self._get_x_space(filter, model, use_evaluation_space)[0]
def _get_x_space(self, filter=False, model=None, use_evaluation_space=False):
data_space = self._get_data_space(filter)
if use_evaluation_space:
evaluation_space = self._get_evaluation_space(data_space, model)
else:
evaluation_space = None
return self._get_indep_grid(data_space, evaluation_space)
@staticmethod
def _get_indep_grid(data_space, evaluation_space=None):
# The current implementation requires that we add some logic to always
# return a tuple, sometimes of one element, sometimes of two
if evaluation_space is None:
if data_space.is_integrated:
return data_space.grid
else:
return data_space.grid,
if evaluation_space.is_integrated:
return evaluation_space.grid
else:
return evaluation_space.grid,
def _get_data_space(self, filter):
filter = bool_cast(filter)
if filter:
data_x = self._x
else:
data_x = self.x
return EvaluationSpace1D(data_x)
@staticmethod
def _get_evaluation_space(data_space, model):
evaluation_space = None
if model is not None and hasattr(model, "evaluation_space"):
evaluation_space = model.evaluation_space
if not data_space in evaluation_space:
warnings.warn("evaluation space does not contain the requested space. Sherpa will join the two spaces.")
evaluation_space = evaluation_space.join(data_space)
return evaluation_space
[docs] def get_y(self, filter=False, yfunc=None, use_evaluation_space=False):
"Return dependent axis in N-D view of dependent variable"
y = self.get_dep(filter)
if yfunc is not None:
model_evaluation = yfunc(*self._get_x_space(filter, yfunc, use_evaluation_space))
y = (y, model_evaluation)
return y
[docs] def get_dims(self, filter=False):
return (len(self.get_x(filter)),)
[docs] def get_filter(self, format='%.4f', delim=':'):
# for derived intergrated classes, this will return values in center of
# bin.
x = self.get_x(filter=True)
mask = numpy.ones(len(x), dtype=bool)
if numpy.iterable(self.mask):
mask = self.mask
return create_expr(x, mask, format, delim)
[docs] def get_filter_expr(self):
return (self.get_filter(delim='-') + ' ' + self.get_xlabel())
[docs] def get_bounding_mask(self):
mask = self.mask
size = None
if numpy.iterable(self.mask):
# create bounding box around noticed image regions
mask = numpy.array(self.mask)
# xi = numpy.where(mask == True)[0]
# xlo = xi.min()
# xhi = xi.max()
# size = (mask[xlo:xhi+1].size,)
# mask = mask[xlo:xhi+1]
size = (mask.size,)
return mask, size
[docs] def get_img(self, yfunc=None):
"Return 1D dependent variable as a 1 x N image"
y_img = self.get_y(False, yfunc)
if yfunc is not None:
y_img = (y_img[0].reshape(1, y_img[0].size),
y_img[1].reshape(1, y_img[1].size))
else:
y_img = y_img.reshape(1, y_img.size)
return y_img
[docs] def get_imgerr(self):
err = self.get_error()
if err is not None:
err = err.reshape(1, err.size)
return err
[docs] def notice(self, xlo=None, xhi=None, ignore=False):
BaseData.notice(self, (xlo,), (xhi,), self.get_indep(), ignore)
[docs]class Data1DInt(Data1D):
"1-D integrated data set"
def _set_mask(self, val):
DataND._set_mask(self, val)
try:
self._lo = self.apply_filter(self.xlo)
self._hi = self.apply_filter(self.xhi)
except DataErr:
self._lo = self.xlo
self._hi = self.xhi
mask = property(DataND._get_mask, _set_mask,
doc='Mask array for dependent variable')
def __init__(self, name, xlo, xhi, y, staterror=None, syserror=None):
self._lo = xlo
self._hi = xhi
BaseData.__init__(self)
def _get_data_space(self, filter=False):
filter = bool_cast(filter)
if filter:
return EvaluationSpace1D(self._lo, self._hi)
return EvaluationSpace1D(self.xlo, self.xhi)
[docs] def get_x(self, filter=False, model=None, use_evaluation_space=False):
indep = self._get_x_space(filter, model, use_evaluation_space)
return (indep[0] + indep[1]) / 2.0
[docs] def get_xerr(self, filter=False, model=None):
xlo, xhi = self.get_indep(filter, model)
return xhi - xlo
[docs] def notice(self, xlo=None, xhi=None, ignore=False):
BaseData.notice(self, (None, xlo), (xhi, None), self.get_indep(),
ignore)
[docs]class Data2D(DataND):
"2-D data set"
def _set_mask(self, val):
DataND._set_mask(self, val)
try:
self._x0 = self.apply_filter(self.x0)
self._x1 = self.apply_filter(self.x1)
except DataErr:
self._x0 = self.x0
self._x1 = self.x1
mask = property(DataND._get_mask, _set_mask,
doc='Mask array for dependent variable')
def __init__(self, name, x0, x1, y, shape=None, staterror=None,
syserror=None):
self._x0 = x0
self._x1 = x1
BaseData.__init__(self)
[docs] def get_indep(self, filter=False, model=None ):
filter = bool_cast(filter)
if filter:
return (self._x0, self._x1)
return (self.x0, self.x1)
[docs] def get_x0(self, filter=False):
return self.get_indep(filter)[0]
[docs] def get_x1(self, filter=False):
return self.get_indep(filter)[1]
[docs] def get_axes(self):
self._check_shape()
# FIXME: how to filter an axis when self.mask is size of self.y?
return (numpy.arange(self.shape[1]) + 1,
numpy.arange(self.shape[0]) + 1)
[docs] def get_dims(self, filter=False):
# self._check_shape()
if self.shape is not None:
return self.shape[::-1]
return (len(self.get_x0(filter)), len(self.get_x1(filter)))
[docs] def get_filter_expr(self):
return ''
get_filter = get_filter_expr
def _check_shape(self):
if self.shape is None:
raise DataErr('shape', self.name)
[docs] def get_max_pos(self, dep=None):
if dep is None:
dep = self.get_dep(True)
x0 = self.get_x0(True)
x1 = self.get_x1(True)
pos = numpy.asarray(numpy.where(dep == dep.max())).squeeze()
if pos.ndim == 0:
pos = int(pos)
return (x0[pos], x1[pos])
return [(x0[index], x1[index]) for index in pos]
[docs] def get_img(self, yfunc=None):
self._check_shape()
y_img = self.get_y(False, yfunc)
if yfunc is not None:
y_img = (y_img[0].reshape(*self.shape),
y_img[1].reshape(*self.shape))
else:
y_img = y_img.reshape(*self.shape)
return y_img
[docs] def get_imgerr(self):
self._check_shape()
err = self.get_error()
if err is not None:
err = err.reshape(*self.shape)
return err
[docs] def notice(self, x0lo=None, x0hi=None, x1lo=None, x1hi=None, ignore=False):
BaseData.notice(self, (x0lo, x1lo), (x0hi, x1hi), self.get_indep(),
ignore)
[docs]class Data2DInt(Data2D):
"2-D integrated data set"
def _set_mask(self, val):
DataND._set_mask(self, val)
try:
self._x0lo = self.apply_filter(self.x0lo)
self._x0hi = self.apply_filter(self.x0hi)
self._x1lo = self.apply_filter(self.x1lo)
self._x1hi = self.apply_filter(self.x1hi)
except DataErr:
self._x0lo = self.x0lo
self._x1lo = self.x1lo
self._x0hi = self.x0hi
self._x1hi = self.x1hi
mask = property(DataND._get_mask, _set_mask,
doc='Mask array for dependent variable')
def __init__(self, name, x0lo, x1lo, x0hi, x1hi, y, shape=None,
staterror=None, syserror=None):
self._x0lo = x0lo
self._x1lo = x1lo
self._x0hi = x0hi
self._x1hi = x1hi
BaseData.__init__(self)
[docs] def get_indep(self, filter=False):
filter = bool_cast(filter)
if filter:
return (self._x0lo, self._x1lo, self._x0hi, self._x1hi)
return (self.x0lo, self.x1lo, self.x0hi, self.x1hi)
[docs] def get_x0(self, filter=False):
indep = self.get_indep(filter)
return (indep[0] + indep[2]) / 2.0
[docs] def get_x1(self, filter=False):
indep = self.get_indep(filter)
return (indep[1] + indep[3]) / 2.0
[docs] def notice(self, x0lo=None, x0hi=None, x1lo=None, x1hi=None, ignore=False):
BaseData.notice(self, (None, None, x0lo, x1lo),
(x0hi, x1hi, None, None), self.get_indep(), ignore)