#
# Copyright (C) 2007, 2015, 2016, 2019 - 2021, 2023, 2024
# Smithsonian Astrophysical Observatory
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
from itertools import chain
import logging
import numpy as np
from numpy.linalg import LinAlgError
from sherpa.utils import NoNewAttributesAfterInit, print_fields, Knuth_close, \
is_iterable, list_to_open_interval, quad_coef, \
demuller, zeroin, OutOfBoundErr, FuncCounter
from sherpa.utils.parallel import multi, ncpus, context, process_tasks
import sherpa.estmethods._est_funcs
# TODO: this should not be set globally
_ = np.seterr(invalid='ignore')
__all__ = ('EstNewMin', 'Covariance', 'Confidence',
'Projection', 'est_success', 'est_failure', 'est_hardmin',
'est_hardmax', 'est_hardminmax', 'est_newmin', 'est_maxiter',
'est_hitnan')
est_success = 0
est_failure = 1
est_hardmin = 2
est_hardmax = 3
est_hardminmax = 4
est_newmin = 5
est_maxiter = 6
est_hitnan = 7
# For every method listed here, we have the same goal: derive confidence
# limits for thawed parameters. Thawed parameters are allowed to vary
# during a fit; when a model has been fit to data, then the current
# parameter values are presumably the best-fit values. Think of the
# best-fit values as being at the lowest point in a valley in parameter
# space--any step away, in any direction, means a worse fit (i.e., the
# value of the fit statistic is greater). Confidence limits tell you
# how well constrained those best fit values are; i.e., are we in a deep,
# narrow valley in parameter space? If so, we can be confident the limits
# are small. But if the valley is shallow and broad, then the confidence
# limits will also be very broad.
#
# Every method is passed the same information:
# the current values of all thawed parameters;
# the soft limits of all thawed parameters;
# the hard limits of all thawed parameters;
# the list of parameters for which we are actually want confidence
# limits (this can be a subset of all thawed parameters)
# a reference to the statistic function;
# a reference to the fitting function.
# class EstMethodError(SherpaError):
# "Reached an error while computing parameter confidence limits"
# pass
#
# class EstHardMin(EstMethodError):
# "Reached a parameter hard minimum"
# pass
#
# class EstHardMax(EstMethodError):
# "Reached a parameter hard maximum"
# pass
#
[docs]
class EstNewMin(Exception):
"Reached a new minimum fit statistic"
pass
#
# class EstMaxIter(EstMethodError):
# "Reached maximum iterations in scaling function"
# pass
#
# class EstNaN(EstMethodError):
# "Reached a NaN during computation"
# pass
[docs]
class EstMethod(NoNewAttributesAfterInit):
# defined pre-instantiation for pickling
config = {'sigma': 1,
'eps': 0.01,
'maxiters': 200,
'soft_limits': False}
def __init__(self, name, estfunc):
self._estfunc = estfunc
self.name = name
# config should be defined pre-instantiation for pickling
# however, for some unknown reason membership in self.__dict__
# requires declaration in __init__()
self.config = self.config.copy()
NoNewAttributesAfterInit.__init__(self)
def __getattr__(self, name):
if name in self.__dict__.get('config', ()):
return self.config[name]
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
def __setattr__(self, name, val):
if name in self.__dict__.get('config', ()):
self.config[name] = val
else:
NoNewAttributesAfterInit.__setattr__(self, name, val)
def __repr__(self):
return f"<{type(self).__name__} error-estimation method instance '{self.name}'>"
def __str__(self):
# Put name first always
keylist = list(self.config.keys())
keylist = ['name'] + keylist
full_config = {'name': self.name}
full_config.update(self.config)
return print_fields(keylist, full_config)
def __setstate__(self, state):
self.__dict__.update(state)
# obtain config values from object class
# TODO: pylint points out there's no name or estfunc set for
# the initialization call
self.__dict__['config'] = getattr(self.__class__(), 'config', {})
# update new config dict with user defined from old
self.__dict__['config'].update(state.get('config', {}))
[docs]
def compute(self, statfunc, fitfunc, pars,
parmins, parmaxes, parhardmins,
parhardmaxes, limit_parnums, freeze_par, thaw_par,
report_progress, get_par_name,
statargs=(), statkwargs={}):
def stat_cb(pars):
return statfunc(pars)[0]
def fit_cb(scb, pars, parmins, parmaxes, i):
# parameter i is a no-op usually
return fitfunc(scb, pars, parmins, parmaxes)[2]
# remin means reminimize -- *generally* not done (only
# proj needs to reminimize).
#
# covar still needs to pass a reminimize flag to
# get_one_sided_interval; a value less than zero is
# interpreted as "never reminimize", so pass that value here.
remin = -1.0
tol = -1.0
return self._estfunc(pars, parmins, parmaxes, parhardmins,
parhardmaxes, self.sigma, self.eps,
tol,
self.maxiters, remin, limit_parnums,
stat_cb, fit_cb, report_progress)
[docs]
class Covariance(EstMethod):
def __init__(self, name='covariance'):
EstMethod.__init__(self, name, covariance)
[docs]
class Confidence(EstMethod):
# defined pre-instantiation for pickling
_added_config = {'remin': 0.01,
'fast': False,
'parallel': True,
'numcores': ncpus,
'maxfits': 5,
'max_rstat': 3,
'tol': 0.2,
'verbose': False,
'openinterval': False}
def __init__(self, name='confidence'):
EstMethod.__init__(self, name, confidence)
# Update EstMethod.config dict with Confidence specifics
self.config.update(self._added_config)
[docs]
def compute(self, statfunc, fitfunc, pars,
parmins, parmaxes, parhardmins,
parhardmaxes, limit_parnums, freeze_par, thaw_par,
report_progress, get_par_name,
statargs=(), statkwargs={}):
def stat_cb(pars):
return statfunc(pars)[0]
def fit_cb(pars, parmins, parmaxes, i):
# freeze model parameter i
(current_pars,
current_parmins,
current_parmaxes) = freeze_par(pars, parmins, parmaxes, i)
fit_pars = fitfunc(statfunc, current_pars,
current_parmins,
current_parmaxes)[1]
# If stat is not chi-squared, and fit method is
# lmdif, need to recalculate stat at end, just
# like in sherpa/sherpa/fit.py:fit()
stat = statfunc(fit_pars)[0]
# stat = fitfunc(scb, pars, parmins, parmaxes)[2]
# thaw model parameter i
thaw_par(i)
return stat
#
# convert stat call back to have the same signature as fit call back
#
def stat_cb_extra_args(fcn):
def stat_cb_wrapper(x, *args):
return fcn(x)
return stat_cb_wrapper
statcb = stat_cb_extra_args(stat_cb)
if 1 == len(pars):
fitcb = statcb
else:
fitcb = fit_cb
return self._estfunc(pars, parmins, parmaxes, parhardmins,
parhardmaxes, self.sigma, self.eps,
self.tol, self.maxiters, self.remin,
self.verbose, limit_parnums,
statcb, fitcb, report_progress, get_par_name,
self.parallel, self.numcores, self.openinterval)
[docs]
class Projection(EstMethod):
# defined pre-instantiation for pickling
_added_config = {'remin': 0.01,
'fast': False,
'parallel': True,
'numcores': ncpus,
'maxfits': 5,
'max_rstat': 3,
'tol': 0.2}
def __init__(self, name='projection'):
EstMethod.__init__(self, name, projection)
# Update EstMethod.config dict with Projection specifics
self.config.update(self._added_config)
[docs]
def compute(self, statfunc, fitfunc, pars,
parmins, parmaxes, parhardmins,
parhardmaxes, limit_parnums, freeze_par, thaw_par,
report_progress, get_par_name,
statargs=(), statkwargs={}):
if fitfunc is None:
raise TypeError("fitfunc should not be none")
def stat_cb(pars):
return statfunc(pars)[0]
def fit_cb(pars, parmins, parmaxes, i):
# freeze model parameter i
(current_pars,
current_parmins,
current_parmaxes) = freeze_par(pars, parmins, parmaxes, i)
fit_pars = fitfunc(statfunc, current_pars,
current_parmins,
current_parmaxes)[1]
# If stat is not chi-squared, and fit method is
# lmdif, need to recalculate stat at end, just
# like in sherpa/sherpa/fit.py:fit()
stat = statfunc(fit_pars)[0]
# stat = fitfunc(scb, pars, parmins, parmaxes)[2]
# thaw model parameter i
thaw_par(i)
return stat
return self._estfunc(pars, parmins, parmaxes, parhardmins,
parhardmaxes, self.sigma, self.eps,
self.tol,
self.maxiters, self.remin, limit_parnums,
stat_cb, fit_cb, report_progress, get_par_name,
self.parallel, self.numcores)
def covariance(pars, parmins, parmaxes, parhardmins, parhardmaxes, sigma, eps,
tol, maxiters, remin, limit_parnums, stat_cb, fit_cb,
report_progress):
# Do nothing with tol
# Do nothing with report_progress (generally fast enough we don't
# need to report back per-parameter progress)
# Even though we only want limits on certain parameters, we have to
# compute the matrix for *all* thawed parameters. So we will do that,
# and then pick the parameters of interest out of the result.
try:
info = _est_funcs.info_matrix(pars, parmins, parmaxes, parhardmins,
parhardmaxes, sigma, eps, maxiters,
remin, stat_cb)
except EstNewMin as emin:
# catch the EstNewMin exception and attach the modified
# parameter values to the exception obj. These modified
# parvals determine the new lower statistic.
raise EstNewMin(pars) from emin
# Invert matrix, take its square root and multiply by sigma to get
# parameter uncertainties; parameter uncertainties are the
# diagonal elements of the matrix.
# Use simpler matrix inversion function from numpy. If that
# doesn't work, assume it's an ill-conditioned or singular matrix,
# and call pinv from numpy -- pinv will call the SVD function to
# invert the matrix. But call pinv *only* when inv is shown not
# to work in a particular case -- use inv by default!
# The reason for this is that pinv can give back very strange
# results, when you don't *need* to use pinv, and it *also*
# happens that the ratio between smallest and largest diagonal
# elements approaches the machine precision for the data type.
# The result is that errors that come from the largest diagonal
# element are ludicrously small; you can't have a parameter value
# of order 1.0, and an error of order 10^-30, for example. The
# simpler inv function for inverting matrices does not appear to
# have the same issue.
inv_info = None
try:
inv_info = np.linalg.inv(info)
except LinAlgError:
# catch the SVD exception and exit gracefully
inv_info = np.zeros_like(info)
inv_info[:] = np.nan
except:
try:
inv_info = np.linalg.pinv(info)
except LinAlgError:
# catch the SVD exception and exit gracefully
inv_info = np.zeros_like(info)
inv_info[:] = np.nan
diag = (sigma * np.sqrt(inv_info)).diagonal()
# limit_parnums lists the indices of the array pars, that
# correspond to the parameters of interest. We will pick out
# the diagonal elements corresponding to entries in limits_parnums,
# and return only those bounds to the user.
upper_bounds = []
lower_bounds = []
error_flags = []
for num in limit_parnums:
eflag = est_success
ubound = diag[num]
lbound = -diag[num]
# What happens when lbound or ubound is NaN? This is
# presumably why the code is written as it is below (e.g. a
# pass if the values can be added to pars[num]).
#
if pars[num] + ubound < parhardmaxes[num]:
pass
else:
ubound = np.nan
eflag = est_hardmax
if pars[num] + lbound > parhardmins[num]:
pass
else:
lbound = np.nan
if eflag == est_hardmax:
eflag = est_hardminmax
else:
eflag = est_hardmin
upper_bounds.append(ubound)
lower_bounds.append(lbound)
error_flags.append(eflag)
return (np.array(lower_bounds), np.array(upper_bounds),
np.array(error_flags), 0, inv_info)
def projection(pars, parmins, parmaxes, parhardmins, parhardmaxes, sigma, eps,
tol, maxiters, remin, limit_parnums, stat_cb, fit_cb,
report_progress, get_par_name, do_parallel, numcores):
# Number of parameters to be searched on (*not* number of thawed
# parameters, just number we are searching on)
numsearched = len(limit_parnums)
# _est_funcs.projection can be called on any subset of the thawed
# parameters. So we made a change here to call _est_funcs.projection
# once per parameter number listed in limit_parnums, instead of
# calling _est_funcs.projection once, with the original limit_parnums
# array. This way, we can report pack progress after the confidence
# limit search is completed for each parameter, without descending
# into the C++ code.
#
# It does mean we have to take apart the tuple returned by each call
# to _est_funcs.projection; take the data we've pulled out, and
# upon exiting the while loop, constructing a new tuple to return.
# SMD 03/17/2009
proj_func = _est_funcs.projection
def func(i, singleparnum, lock=None):
try:
singlebounds = proj_func(pars, parmins, parmaxes,
parhardmins, parhardmaxes,
sigma, eps, tol, maxiters,
remin, [singleparnum], stat_cb,
fit_cb)
except EstNewMin as emin:
# catch the EstNewMin exception and attach the modified
# parameter values to the exception obj. These modified
# parvals determine the new lower statistic.
raise EstNewMin(pars) from emin
if lock is not None:
lock.acquire()
report_progress(singleparnum, singlebounds[0], singlebounds[1])
if lock is not None:
lock.release()
return (singlebounds[0][0], singlebounds[1][0], singlebounds[2][0],
singlebounds[3], None)
if numsearched < 2 or not multi or numcores < 2:
do_parallel = False
if not do_parallel:
lower_limits = np.zeros(numsearched)
upper_limits = np.zeros(numsearched)
eflags = np.zeros(numsearched, dtype=int)
nfits = 0
for i, pnum in enumerate(limit_parnums):
singlebounds = func(i, pnum)
lower_limits[i] = singlebounds[0]
upper_limits[i] = singlebounds[1]
eflags[i] = singlebounds[2]
nfits = nfits + singlebounds[3]
return (lower_limits, upper_limits, eflags, nfits, None)
return parallel_est(func, limit_parnums, pars, numcores)
#################################confidence###################################
class ConfArgs:
"""The class ConfArgs is responsible for the arguments to the fit
call back function."""
def __init__(self, xpars, smin, smax, hmin, hmax, target_stat):
self.ith_par = 0
self.xpars = np.array(xpars, copy=True)
self.slimit = (np.array(smin, copy=True),
np.array(smax, copy=True))
self.hlimit = (np.array(hmin, copy=True),
np.array(hmax, copy=True))
self.target_stat = target_stat
def __call__(self):
return (self.ith_par, self.xpars, self.slimit, self.hlimit,
self.target_stat)
def __str__(self):
a2s = np.array2string
msg = ''
msg += '# smin = ' + a2s(self.slimit[0], precision=6) + '\n'
msg += '# smax = ' + a2s(self.slimit[1], precision=6) + '\n'
msg += '# hmin = ' + a2s(self.hlimit[0], precision=6) + '\n'
msg += '# hmax = ' + a2s(self.hlimit[1], precision=6) + '\n#\n'
msg += '# Note: for the intermediate steps, the notation:\n'
msg += ' par.name -/+: f( x ) = stat\n'
msg += '# ==> `stat` is the statistic when parameter `par.name` is frozen at `x`\n'
msg += '# while searching for the `lower/upper` confidence level, respectively.\n#'
return msg
def get_par(self):
"""return the current (worked on) par"""
return self.xpars[self.ith_par]
def get_hlimit(self, dir):
""" return the current (worked on) hard limit"""
return self.hlimit[dir][self.ith_par]
def get_slimit(self, dir):
""" return the current (worked on) soft limit"""
return self.slimit[dir][self.ith_par]
class ConfBlog:
def __init__(self, blogger, prefix, verbose, lock, debug=False):
self.blogger = blogger
self.prefix = prefix
self.verbose = verbose
self.lock = lock
self.debug = debug
class ConfBracket:
"""The class ConfBracket is responsible for bracketing the root within
the interval (a,b) where f(a)*f(b) < 0.0"""
neg_pos = (-1, 1)
# Is there any benefit to making these local to ConfBracket?
class Limit:
def __init__(self, limit):
self.limit = limit
class LowerLimit(Limit):
def __str__(self):
return f'LowerLimit: limit={self.limit:e}'
def is_beyond_limit(self, x):
return x < self.limit
class UpperLimit(Limit):
def __str__(self):
return f'UpperLimit: limit={self.limit:e}'
def is_beyond_limit(self, x):
return x > self.limit
def __init__(self, myargs, trial_points):
self.myargs = myargs
self.trial_points = trial_points
self.fcn = None
# TODO: rename the dir and iter arguments
def __call__(self, dir, iter, step_size, open_interval, maxiters, tol,
bloginfo):
#
# Either 1) a root has been found (ConfRootZero), 2) an interval
# where the root has been confined (ConfRootBracket) or 3) No
# possible chance for a root (ConfRootNone), ie by trying points
# upto/beyond the hard limit and no chance for a root has been found.
#
find = trace_fcn(self.find, bloginfo)
return find(dir, iter, step_size, open_interval, maxiters, tol,
bloginfo)
# TODO: rename the dir and iter arguments
# TODO: the iter argument gets over-ridden below, so do we need it? (and rename it)
def find(self, dir, iter, step_size, open_interval, maxiters, tol,
bloginfo, base=2.0):
assert self.fcn is not None, 'callback func has not been set'
hlimit = [ConfBracket.LowerLimit(self.myargs.get_hlimit(dir)),
ConfBracket.UpperLimit(self.myargs.get_hlimit(dir))]
slimit = [ConfBracket.LowerLimit(self.myargs.get_slimit(dir)),
ConfBracket.UpperLimit(self.myargs.get_slimit(dir))]
xxx = self.trial_points[0]
fff = self.trial_points[1]
conf_step = ConfStep(xxx, fff)
mymaxiters = min(maxiters, 16)
plateau = 0
max_plateau_iter = 5
try:
# TODO: rename iter argument
for iter in range(mymaxiters):
if 0 == iter:
x = conf_step.covar(dir, iter, step_size, base)
elif 1 == iter:
x = conf_step.secant(dir, iter, step_size, base)
# x = conf_step.covar( dir, iter, step_size, base )
else:
x = conf_step.quad(dir, iter, step_size, base, bloginfo)
if x is None or np.isnan(x):
return ConfRootNone()
# Make sure x is not beyond the **hard** limit
if hlimit[dir].is_beyond_limit(x):
x = hlimit[dir].limit
f = self.fcn(x, self.myargs())
# print 'find(): beyond hard limit: f(%.14e)=%.14e' % (x,f)
if abs(f) <= tol:
return ConfRootZero(x)
if f >= 0.0:
return ConfRootBracket(self.fcn, self.trial_points,
open_interval)
return ConfRootNone()
if slimit[dir].is_beyond_limit(x):
f = self.fcn(x, self.myargs())
# print 'find(): beyond soft limit: f(%.14e)=%.14e' % (x,f)
if abs(f) <= tol:
return ConfRootZero(x)
if f >= 0.0:
return ConfRootBracket(self.fcn, self.trial_points,
open_interval)
if f < fff[-2]:
# if the fit beyond the soft limit is a better fit
# then the confidence for the parameter does not exist
return ConfRootNone()
else:
f = self.fcn(x, self.myargs())
# print 'find(): f(%.14e)=%.14e' % (x,f)
if abs(f) <= tol:
return ConfRootZero(x)
if f >= 0.0:
return ConfRootBracket(self.fcn, self.trial_points,
open_interval)
if Knuth_close(fff[-2], fff[-1], 1.0e-6):
plateau += 1
if plateau > max_plateau_iter:
# print 'find( %d ): plateau = %d', (iter,plateau)
return ConfRootNone(None)
# else:
# if plateau > 0:
# plateau -= 1
return ConfRootNone(None)
except OutOfBoundErr:
return ConfRootNone()
class ConfRootNone:
"""The base class for the root of the confidence interval"""
def __init__(self, root=None):
"""If self.root == None, then
1) points up to the hard limits were tried and it was not possible
to bracketed the solution.
2) a parameter beyond the soft limit has been tried and the new stat
was found to be **less** then the initial minimum"""
self.root = root
def __call__(self, tol, bloginfo):
return self.root
def __str__(self):
return 'No possible root exist'
class ConfRootBracket(ConfRootNone):
"""The class contains the bracket where the confidence root has
been bracketed, ie where f(a)*f(b) < 0"""
def __init__(self, fcn, trial_points, open_interval):
ConfRootNone.__init__(self, None)
self.fcn = fcn
self.trial_points = trial_points
self.open_interval = open_interval
def __call__(self, tol, bloginfo):
def warn_user_about_open_interval(listval):
if bloginfo.lock is not None:
bloginfo.lock.acquire()
if 0 == bloginfo.verbose:
prefix = '%s ' % bloginfo.prefix.lstrip()
else:
prefix = '%s ' % bloginfo.prefix
interval = list_to_open_interval(listval)
bloginfo.blogger.info(prefix +
'WARNING: The confidence level lies within '
+ interval)
if bloginfo.lock is not None:
bloginfo.lock.release()
xxx = self.trial_points[0]
fff = self.trial_points[1]
if np.sign(fff[-2]) == np.sign(fff[-1]):
self.root = None
return None
myzeroin = trace_fcn(zeroin, bloginfo)
answer = myzeroin(self.fcn, xxx[-2], xxx[-1], fa=fff[-2],
fb=fff[-1], maxfev=32, tol=tol)
if abs(answer[0][1]) > tol:
xafa = answer[1][0]
xa = xafa[0]
fa = xafa[1]
xbfb = answer[1][1]
xb = xbfb[0]
fb = xbfb[1]
if np.sign(fa) != np.sign(fb):
if not self.open_interval:
warn_user_about_open_interval([xa, xb])
return (xa + xb) / 2.0
return min(xa, xb), max(xa, xb)
return None
self.root = answer[0][0]
return self.root
def __str__(self):
msg = 'root is within the interval ( f(%e)=%e, f(%e)=%e )' \
% (self.trial_points[0][-2], self.trial_points[1][-2],
self.trial_points[0][-1], self.trial_points[1][-1], )
return msg
class ConfRootZero(ConfRootNone):
"""The class with the root/zero of the confidence interval"""
def __str__(self):
return f'root = {self.root:e}'
class ConfStep:
def __init__(self, xtrial, ftrial):
self.xtrial = xtrial
self.ftrial = ftrial
def covar(self, dir, iter, stepsize, base):
return self.xtrial[-1] + \
ConfBracket.neg_pos[dir] * pow(base, iter) * stepsize
# return self.xtrial[ 0 ] + \
# ConfBracket.neg_pos[ dir ] * pow( base, iter ) * stepsize
def Halley(self, coeffs, x, maxfev=8, tol=1.0e-3):
for nfev in range(maxfev):
ax = coeffs[0] * x
fval = (ax + coeffs[1]) * x + coeffs[2]
if abs(fval) <= tol:
return [x, fval]
fdif = 2.0 * ax + coeffs[1]
fdif2 = 2.0
numer = 2.0 * fval * fdif
denom = 2.0 * fdif * fdif - fval * fdif2
x -= numer / denom
nfev += 1
return [x, fval]
def is_same_dir(self, dir, current_pos, proposed_pos):
delta = proposed_pos - current_pos
return np.sign(delta) == ConfBracket.neg_pos[dir]
def quad(self, dir, iter, step_size, base, bloginfo):
coeffs = quad_coef(self.xtrial[-3:], self.ftrial[-3:])
delta = ConfBracket.neg_pos[dir]
delta *= abs(self.xtrial[-1] - self.xtrial[-2])
lastx = self.xtrial[-1]
mroot = demuller(np.poly1d(coeffs), lastx + delta,
lastx + 2 * delta, lastx + 3 * delta,
tol=1.0e-2)
xroot = mroot[0][0]
if xroot is None or np.isnan(xroot):
return self.covar(dir, iter, step_size, base)
try:
Halley = trace_fcn(self.Halley, bloginfo)
[xroot, froot] = Halley(coeffs, xroot, tol=1.0e-3)
except ZeroDivisionError:
xroot = None
if xroot is not None and not np.isnan(xroot) and \
self.is_same_dir(dir, self.xtrial[-1], xroot):
return xroot
return self.covar(dir, iter, step_size, base)
def secant(self, dir, iter, step_size, base):
xb = self.xtrial[-2]
fb = self.ftrial[-2]
xa = self.xtrial[-1]
fa = self.ftrial[-1]
if abs(fb) > abs(fa) or 0.0 == fa:
return self.covar(dir, iter, step_size, base)
s = fb / fa
p = (xa - xb) * s
if 1.0 == s:
return self.covar(dir, iter, step_size, base)
q = 1.0 - s
x = xb - p / q
if self.is_same_dir(dir, xa, x):
return x
return self.covar(dir, iter, step_size, base)
def trace_fcn(fcn, bloginfo):
if not bloginfo.debug:
return fcn
def echo(*args, **kwargs):
'''compact but more details then debugger'''
name = fcn.__name__
msg = '%s%s(%s)' % (bloginfo.prefix, name, ", ".join(
list(map(repr, chain(args, list(kwargs.values()))))))
bloginfo.blogger.info(msg)
return fcn(*args, **kwargs)
def debugger(*args, **kwargs):
msg = '%s%s( ' % (bloginfo.prefix, fcn.__name__)
if len(args) > 1:
msg += str(args[0])
for arg in args[1:]:
msg = '%s, %s' % (msg, arg)
for key in kwargs.keys():
value = kwargs[key]
msg = '%s, %s=%s' % (msg, key, value)
val = fcn(*args, **kwargs)
msg += ' ) %s ' % val
bloginfo.blogger.info(msg)
return val
return debugger
def confidence(pars, parmins, parmaxes, parhardmins, parhardmaxes, sigma, eps,
tol, maxiters, remin, verbose, limit_parnums, stat_cb,
fit_cb, report_progress, get_par_name, do_parallel, numcores,
open_interval):
def get_prefix(index, name, minus_plus):
'''To print the prefix/indent when verbose is on'''
blank = 3 * index * ' '
return [f"{blank}{name} {mtext}:" for mtext in minus_plus]
def get_delta_root(arg, dir, par_at_min):
my_neg_pos = ConfBracket.neg_pos[dir]
if is_iterable(arg):
# return map( lambda x: my_neg_pos * abs( x - par_at_min ), arg )
return arg
if arg is not None:
arg -= par_at_min
return my_neg_pos * abs(arg)
return arg
def get_step_size(error_scales, upper_scales, index, par):
if 0 != error_scales[index]:
# if covar error is NaN then set it to fraction of the par value.
ith_covar_err = 0.0625 * abs(par)
else:
ith_covar_err = abs(upper_scales[index])
if 0.0 == ith_covar_err:
# just in case covar and/or par is 0
ith_covar_err = 1.0e-6
return ith_covar_err
def monitor_func(fcn, history):
def myfunc(x, *args):
fval = fcn(x, *args)
history[0].append(x)
history[1].append(fval)
return fval
return myfunc
def print_status(myblog, verbose, prefix, answer, lock):
if lock is not None:
lock.acquire()
if 0 == verbose:
msg = '%s\t' % prefix.lstrip()
else:
msg = '%s\t' % prefix
if is_iterable(answer):
msg += list_to_open_interval(answer)
elif answer is None:
msg += '-----'
else:
msg += '%g' % answer
myblog(msg)
if lock is not None:
lock.release()
#
# Work in the translated coordinate. Hence the 'errors/confidence'
# are the zeros/roots in the translated coordinate system.
#
def translated_fit_cb(fcn, myargs):
def translated_fit_cb_wrapper(x, *args):
hlimit = myargs.hlimit
slimit = myargs.slimit
hmin = hlimit[0]
hmax = hlimit[1]
xpars = myargs.xpars
ith_par = myargs.ith_par
# The parameter must be within the hard limits
if x < hmin[ith_par] or x > hmax[ith_par]:
raise OutOfBoundErr
smin = slimit[0]
smax = slimit[1]
orig_ith_xpar = xpars[ith_par]
xpars[ith_par] = x
translated_stat = fcn(
xpars, smin, smax, ith_par) - myargs.target_stat
xpars[ith_par] = orig_ith_xpar
return translated_stat
return translated_fit_cb_wrapper
def verbose_fitcb(fcn, bloginfo):
if 0 == bloginfo.verbose:
return fcn
def verbose_fcn(x, *args):
fval = fcn(x, *args)
msg = '%s f( %e ) =' % (bloginfo.prefix, x)
if fval is None:
msg = '%s None' % msg
else:
msg = '%s %e' % (msg, fval)
bloginfo.blogger.info(msg)
return fval
return verbose_fcn
sherpablog = logging.getLogger('sherpa') # where to print progress report
# Get minimum fit statistic, and calculate target statistic value
orig_min_stat = stat_cb(pars)
delta_stat = sigma * sigma
target_stat = orig_min_stat + delta_stat
lower_scales = None
upper_scales = None
error_scales = None
nfits = 0
results = None
try:
(lower_scales, upper_scales, error_scales, nfits,
results) = covariance(pars, parmins, parmaxes, parhardmins,
parhardmaxes, 1.0, eps, tol, maxiters,
remin, limit_parnums, stat_cb,
fit_cb, report_progress)
except EstNewMin as e:
raise e
except:
error_scales = np.full(len(pars), est_hardminmax)
debug = False # for internal use only
myargs = ConfArgs(pars, parmins, parmaxes, parhardmins, parhardmaxes,
target_stat)
if 0 != verbose:
msg = '#\n# f' + np.array2string(np.asarray(pars), precision=6)
msg += ' = %e\n' % orig_min_stat
msg += '# sigma = %e\n' % sigma
msg += '# target_stat = %e\n' % target_stat
msg += '# tol = %e\n' % eps
msg += '%s' % myargs
sherpablog.info(msg)
# TODO: this dictionary is used to store a value, but the value is
# never used. Do we need it?
store = {}
def func(counter, singleparnum, lock=None):
counter_cb = FuncCounter(fit_cb)
#
# These are the bounds to be returned by this method
#
conf_int = [[], []]
error_flags = []
#
# If the user has requested a specific parameter to be
# calculated then 'ith_par' represents the index of the
# free parameter to deal with.
#
myargs.ith_par = singleparnum
fitcb = translated_fit_cb(counter_cb, myargs)
par_name = get_par_name(myargs.ith_par)
ith_covar_err = get_step_size(error_scales, upper_scales, counter,
pars[myargs.ith_par])
trial_points = [[], []]
fitcb = monitor_func(fitcb, trial_points)
bracket = ConfBracket(myargs, trial_points)
# the parameter name is set, may as well get the prefix
prefix = get_prefix(counter, par_name, ['-', '+'])
myfitcb = [verbose_fitcb(fitcb,
ConfBlog(sherpablog, prefix[0], verbose, lock)),
verbose_fitcb(fitcb,
ConfBlog(sherpablog, prefix[1], verbose, lock))]
for dirn in range(2):
#
# trial_points stores the history of the points for the
# parameter which has been evaluated in order to locate
# the root. Note the first point is 'given' since the info
# of the minimum is crucial to the search.
#
bracket.trial_points[0].append(pars[myargs.ith_par])
bracket.trial_points[1].append(- delta_stat)
myblog = ConfBlog(sherpablog, prefix[dirn], verbose, lock,
debug)
# have to set the callback func otherwise disaster.
bracket.fcn = myfitcb[dirn]
root = bracket(dirn, iter, ith_covar_err, open_interval, maxiters,
eps, myblog)
myzero = root(eps, myblog)
delta_zero = get_delta_root(myzero, dirn, pars[myargs.ith_par])
conf_int[dirn].append(delta_zero)
status_prefix = get_prefix(counter, par_name, ['lower bound',
'upper bound'])
print_status(myblog.blogger.info, verbose, status_prefix[dirn],
delta_zero, lock)
# This should really set the error flag appropriately.
error_flags.append(est_success)
#
# include the minimum point to separate the -/+ interval
#
store[par_name] = trial_points
return (conf_int[0][0], conf_int[1][0], error_flags[0],
counter_cb.nfev, None)
if len(limit_parnums) < 2 or not multi or numcores < 2:
do_parallel = False
if not do_parallel:
lower_limits = []
upper_limits = []
eflags = []
nfits = 0
for i, lpar in enumerate(limit_parnums):
lower_limit, upper_limit, flags, nfit, extra = func(
i, lpar)
lower_limits.append(lower_limit)
upper_limits.append(upper_limit)
eflags.append(flags)
nfits += nfit
return (lower_limits, upper_limits, eflags, nfits, None)
return parallel_est(func, limit_parnums, pars, numcores)
#################################confidence###################################
def parallel_est(estfunc, limit_parnums, pars, numcores=ncpus):
"""Run a function on a sequence of inputs in parallel.
A specialized version of sherpa.utils.parallel.parallel_map.
Parameters
----------
estfunc : function
This function accepts three arguments and returns a value.
limit_parnums : sequence
pars : sequence
The current parameter values
numcores : int, optional
The number of calls to ``function`` to run in parallel. When
set to ``None``, all the available CPUs on the machine - as
set either by the 'numcores' setting of the 'parallel' section
of Sherpa's preferences or by `multiprocessing.cpu_count` - are
used.
Returns
-------
ans : array
"""
# See sherpa.utils.parallel for a discussion of how multiprocessing is
# being used to run code in parallel.
#
manager = context.Manager()
out_q = manager.Queue()
err_q = manager.Queue()
# Unlike sherpa.utils.parallel.parallel_map, these routines do use a lock
# for serializing screen output.
#
lock = manager.Lock()
size = len(limit_parnums)
parids = np.arange(size)
# if len(limit_parnums) is less than numcores, only use length number of
# processes
if size < numcores:
numcores = size
# group limit_parnums into numcores-worth of chunks
limit_parnums = np.array_split(limit_parnums, numcores)
parids = np.array_split(parids, numcores)
def worker(parids, parnums):
results = []
for parid, singleparnum in zip(parids, parnums):
try:
result = estfunc(parid, singleparnum, lock)
results.append((parid, result))
except EstNewMin:
# catch the EstNewMin exception and include the exception
# class and the modified parameter values to the error queue.
# These modified parvals determine the new lower statistic.
# The exception class will be re-raised with the
# parameter values attached. C++ Python exceptions are not
# picklable for use in the queue.
err_q.put(EstNewMin(pars))
return
except Exception as e:
err_q.put(e)
return
out_q.put(results)
tasks = [context.Process(target=worker, args=(parid, parnum))
for parid, parnum in zip(parids, limit_parnums)]
return run_tasks(tasks, out_q, err_q, size)
def run_tasks(tasks, out_q, err_q, size):
"""Run the processes, exiting early if necessary, and return the results.
A specialized version of sherpa.utils.parallel.run_tasks (note the
different order of the queues).
Parameters
----------
procs : list of multiprocessing.Process tasks
The processes to run.
out_q, err_q : manager.Queue
The success and error and channels used by the processes.
size : int
The number of results being returned (can be larger than len(procs)).
Returns
-------
result : tuple
"""
process_tasks(tasks, err_q)
lower_limits = size * [None]
upper_limits = size * [None]
eflags = size * [None]
nfits = 0
while not out_q.empty():
for parid, singlebounds in out_q.get():
# Have to guarantee that the tuple returned by projection
# is always (array, array, array, int) for this to work.
lower_limits[parid] = singlebounds[0]
upper_limits[parid] = singlebounds[1]
eflags[parid] = singlebounds[2]
nfits += singlebounds[3]
return (lower_limits, upper_limits, eflags, nfits, None)