Source code for sherpa.estmethods

#
#  Copyright (C) 2007, 2015, 2016, 2019 - 2021, 2023 - 2025
#  Smithsonian Astrophysical Observatory
#
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License along
#  with this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

from __future__ import annotations

from collections.abc import Callable, Sequence
import logging
from typing import Any, Protocol, SupportsFloat
import warnings

import numpy as np
from numpy.linalg import LinAlgError

from sherpa.utils import NoNewAttributesAfterInit, \
    FuncCounter, OutOfBoundErr, Knuth_close, \
    print_fields, is_iterable, list_to_open_interval, quad_coef, \
    demuller, zeroin
from sherpa.utils.parallel import SupportsLock, SupportsProcess, \
    SupportsQueue, multi, ncpus, context, process_tasks
from sherpa.utils.types import ArrayType, FitFunc, StatFunc

from . import _est_funcs  # type: ignore


# TODO: this should not be set globally
_ = np.seterr(invalid='ignore')


__all__ = ('EstNewMin', 'Covariance', 'Confidence',
           'Projection', 'est_success', 'est_failure', 'est_hardmin',
           'est_hardmax', 'est_hardminmax', 'est_newmin', 'est_maxiter',
           'est_hitnan')


# The return type for the estimation routines.
#
# It looks like the limits can be numbers or None.
#
ArrayVal = Sequence[SupportsFloat | None] | np.ndarray
EstReturn = tuple[ArrayVal, ArrayVal,
                  Sequence[int | None] | np.ndarray,
                  int, np.ndarray | None]


est_success = 0
est_failure = 1
est_hardmin = 2
est_hardmax = 3
est_hardminmax = 4
est_newmin = 5
est_maxiter = 6
est_hitnan = 7


# This class is used to send around the "new parameter values" in
# sherpa.fit.Fit.est_errors.
#
[docs] class EstNewMin(Exception): "Reached a new minimum fit statistic" pass
[docs] class EstMethod(NoNewAttributesAfterInit): """Estimate errors on a set of parameters. .. versionchanged:: 4.17.1 The estfunc argument is now unused and will be removed in a later release. """ # defined pre-instantiation for pickling config = {'sigma': 1, 'eps': 0.01, 'maxiters': 200, 'soft_limits': False} def __init__(self, name: str, estfunc: Any = None ) -> None: self.name = name if estfunc is not None: warnings.warn("EstMethod: the estfunc argument is deprecated", DeprecationWarning) # config should be defined pre-instantiation for pickling # however, for some unknown reason membership in self.__dict__ # requires declaration in __init__() self.config = self.config.copy() super().__init__() def __getattr__(self, name): if name in self.__dict__.get('config', ()): return self.config[name] raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __setattr__(self, name, val): if name in self.__dict__.get('config', ()): self.config[name] = val else: NoNewAttributesAfterInit.__setattr__(self, name, val) def __repr__(self) -> str: return f"<{type(self).__name__} error-estimation method instance '{self.name}'>" def __str__(self) -> str: # Put name first always keylist = list(self.config.keys()) keylist = ['name'] + keylist full_config = {'name': self.name} full_config.update(self.config) return print_fields(keylist, full_config) def __setstate__(self, state): self.__dict__.update(state) # obtain config values from object class # TODO: pylint points out there's no name set for # the initialization call self.__dict__['config'] = getattr(self.__class__(), 'config', {}) # update new config dict with user defined from old self.__dict__['config'].update(state.get('config', {}))
[docs] def compute(self, statfunc: StatFunc, fitfunc: FitFunc, *, pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, limit_parnums: np.ndarray, # integers freeze_par: Callable, thaw_par: Callable, report_progress: Callable, get_par_name: Callable, statargs=(), statkwargs={} ) -> EstReturn: """Estimate the error range. .. versionchanged:: 4.17.1 All arguments other than statfunc and fitfunc are now keyword-only. Notes ----- The statargs and statkwargs arguments are currently unused. """ # This does not use abc.abstractmethod as inheriting from # NoNewAttributesAfterInit. # raise NotImplementedError()
[docs] class Covariance(EstMethod): """The covariance method for estimating errors.""" def __init__(self, name: str = 'covariance') -> None: super().__init__(name)
[docs] def compute(self, statfunc: StatFunc, fitfunc: FitFunc, *, pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, limit_parnums: np.ndarray, freeze_par: Callable, thaw_par: Callable, report_progress: Callable, get_par_name: Callable, statargs=(), statkwargs={} ) -> EstReturn: """Estimate the error range. .. versionchanged:: 4.17.1 All arguments other than statfunc and fitfunc are now keyword-only. Notes ----- The statargs and statkwargs arguments are currently unused. """ def stat_cb(pars): return statfunc(pars)[0] def fit_cb(scb, pars, parmins, parmaxes, i): # parameter i is a no-op usually return fitfunc(scb, pars, parmins, parmaxes)[2] # remin means reminimize -- *generally* not done (only # proj needs to reminimize). # # covar still needs to pass a reminimize flag to # get_one_sided_interval; a value less than zero is # interpreted as "never reminimize", so pass that value here. remin = -1.0 tol = -1.0 return covariance(pars, parmins=parmins, parmaxes=parmaxes, parhardmins=parhardmins, parhardmaxes=parhardmaxes, sigma=self.sigma, eps=self.eps, tol=tol, maxiters=self.maxiters, remin=remin, limit_parnums=limit_parnums, stat_cb=stat_cb, fit_cb=fit_cb, report_progress=report_progress)
[docs] class Confidence(EstMethod): """The confidence method for estimating errors.""" # defined pre-instantiation for pickling _added_config = {'remin': 0.01, 'fast': False, 'parallel': True, 'numcores': ncpus, 'maxfits': 5, 'max_rstat': 3, 'tol': 0.2, 'verbose': False, 'openinterval': False} def __init__(self, name: str = 'confidence') -> None: super().__init__(name) # Update EstMethod.config dict with Confidence specifics self.config.update(self._added_config)
[docs] def compute(self, statfunc: StatFunc, fitfunc: FitFunc, *, pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, limit_parnums: np.ndarray, # integers freeze_par: Callable, thaw_par: Callable, report_progress: Callable, get_par_name: Callable, statargs=(), statkwargs={} ) -> EstReturn: """Estimate the error range. .. versionchanged:: 4.17.1 All arguments other than statfunc and fitfunc are now keyword-only. Notes ----- The statargs and statkwargs arguments are currently unused. """ def stat_cb(pars): return statfunc(pars)[0] def fit_cb(pars, parmins, parmaxes, i): # freeze model parameter i (current_pars, current_parmins, current_parmaxes) = freeze_par(pars, parmins, parmaxes, i) fit_pars = fitfunc(statfunc, current_pars, current_parmins, current_parmaxes)[1] # If stat is not chi-squared, and fit method is # lmdif, need to recalculate stat at end, just # like in sherpa/sherpa/fit.py:fit() stat = statfunc(fit_pars)[0] # stat = fitfunc(scb, pars, parmins, parmaxes)[2] # thaw model parameter i thaw_par(i) return stat # # convert stat call back to have the same signature as fit call back # def stat_cb_extra_args(fcn): def stat_cb_wrapper(x, *args): return fcn(x) return stat_cb_wrapper statcb = stat_cb_extra_args(stat_cb) if 1 == len(pars): fitcb = statcb else: fitcb = fit_cb return confidence(pars, parmins=parmins, parmaxes=parmaxes, parhardmins=parhardmins, parhardmaxes=parhardmaxes, sigma=self.sigma, eps=self.eps, tol=self.tol, maxiters=self.maxiters, remin=self.remin, verbose=self.verbose, limit_parnums=limit_parnums, stat_cb=statcb, fit_cb=fitcb, report_progress=report_progress, get_par_name=get_par_name, do_parallel=self.parallel, numcores=self.numcores, open_interval=self.openinterval)
[docs] class Projection(EstMethod): """The projection method for estimating errors.""" # defined pre-instantiation for pickling _added_config = {'remin': 0.01, 'fast': False, 'parallel': True, 'numcores': ncpus, 'maxfits': 5, 'max_rstat': 3, 'tol': 0.2} def __init__(self, name: str = 'projection') -> None: super().__init__(name) # Update EstMethod.config dict with Projection specifics self.config.update(self._added_config)
[docs] def compute(self, statfunc: StatFunc, fitfunc: FitFunc, *, pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, limit_parnums: np.ndarray, # integers freeze_par: Callable, thaw_par: Callable, report_progress: Callable, get_par_name: Callable, statargs=(), statkwargs={} ) -> EstReturn: """Estimate the error range. .. versionchanged:: 4.17.1 All arguments other than statfunc and fitfunc are now keyword-only. Notes ----- The statargs and statkwargs arguments are currently unused. """ if fitfunc is None: raise TypeError("fitfunc should not be none") def stat_cb(pars): return statfunc(pars)[0] def fit_cb(pars, parmins, parmaxes, i): # freeze model parameter i (current_pars, current_parmins, current_parmaxes) = freeze_par(pars, parmins, parmaxes, i) fit_pars = fitfunc(statfunc, current_pars, current_parmins, current_parmaxes)[1] # If stat is not chi-squared, and fit method is # lmdif, need to recalculate stat at end, just # like in sherpa/sherpa/fit.py:fit() stat = statfunc(fit_pars)[0] # stat = fitfunc(scb, pars, parmins, parmaxes)[2] # thaw model parameter i thaw_par(i) return stat return projection(pars, parmins=parmins, parmaxes=parmaxes, parhardmins=parhardmins, parhardmaxes=parhardmaxes, sigma=self.sigma, eps=self.eps, tol=self.tol, maxiters=self.maxiters, remin=self.remin, limit_parnums=limit_parnums, stat_cb=stat_cb, fit_cb=fit_cb, report_progress=report_progress, get_par_name=get_par_name, do_parallel=self.parallel, numcores=self.numcores)
def covariance(pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, sigma: float, eps: float, tol: float, maxiters: int, remin: float, limit_parnums: np.ndarray, # integers stat_cb: Callable, fit_cb: Callable, report_progress: Callable ) -> EstReturn: """Estimate errors using the covariance method.""" # Do nothing with tol # Do nothing with report_progress (generally fast enough we don't # need to report back per-parameter progress) # Even though we only want limits on certain parameters, we have to # compute the matrix for *all* thawed parameters. So we will do that, # and then pick the parameters of interest out of the result. try: info = _est_funcs.info_matrix(pars, parmins, parmaxes, parhardmins, parhardmaxes, sigma, eps, maxiters, remin, stat_cb) except EstNewMin as emin: # catch the EstNewMin exception and attach the modified # parameter values to the exception obj. These modified # parvals determine the new lower statistic. raise EstNewMin(pars) from emin # Invert matrix, take its square root and multiply by sigma to get # parameter uncertainties; parameter uncertainties are the # diagonal elements of the matrix. # Use simpler matrix inversion function from numpy. If that # doesn't work, assume it's an ill-conditioned or singular matrix, # and call pinv from numpy -- pinv will call the SVD function to # invert the matrix. But call pinv *only* when inv is shown not # to work in a particular case -- use inv by default! # The reason for this is that pinv can give back very strange # results, when you don't *need* to use pinv, and it *also* # happens that the ratio between smallest and largest diagonal # elements approaches the machine precision for the data type. # The result is that errors that come from the largest diagonal # element are ludicrously small; you can't have a parameter value # of order 1.0, and an error of order 10^-30, for example. The # simpler inv function for inverting matrices does not appear to # have the same issue. try: inv_info = np.linalg.inv(info) except LinAlgError: # catch the SVD exception and exit gracefully inv_info = np.zeros_like(info) inv_info[:] = np.nan except: try: inv_info = np.linalg.pinv(info) except LinAlgError: # catch the SVD exception and exit gracefully inv_info = np.zeros_like(info) inv_info[:] = np.nan diag = (sigma * np.sqrt(inv_info)).diagonal() # limit_parnums lists the indices of the array pars, that # correspond to the parameters of interest. We will pick out # the diagonal elements corresponding to entries in limits_parnums, # and return only those bounds to the user. upper_bounds = [] lower_bounds = [] error_flags = [] for num in limit_parnums: eflag = est_success ubound = diag[num] lbound = -diag[num] # What happens when lbound or ubound is NaN? This is # presumably why the code is written as it is below (e.g. a # pass if the values can be added to pars[num]). # if pars[num] + ubound < parhardmaxes[num]: pass else: ubound = np.nan eflag = est_hardmax if pars[num] + lbound > parhardmins[num]: pass else: lbound = np.nan if eflag == est_hardmax: eflag = est_hardminmax else: eflag = est_hardmin upper_bounds.append(ubound) lower_bounds.append(lbound) error_flags.append(eflag) return (np.array(lower_bounds), np.array(upper_bounds), np.array(error_flags), 0, inv_info) def projection(pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, sigma: float, eps: float, tol: float, maxiters: int, remin: float, limit_parnums: np.ndarray, # integers stat_cb: Callable, fit_cb: Callable, report_progress: Callable, get_par_name: Callable, do_parallel: bool, numcores: int ) -> EstReturn: """Estimate errors using the projection method.""" # Number of parameters to be searched on (*not* number of thawed # parameters, just number we are searching on) numsearched = len(limit_parnums) # _est_funcs.projection can be called on any subset of the thawed # parameters. So we made a change here to call _est_funcs.projection # once per parameter number listed in limit_parnums, instead of # calling _est_funcs.projection once, with the original limit_parnums # array. This way, we can report pack progress after the confidence # limit search is completed for each parameter, without descending # into the C++ code. # # It does mean we have to take apart the tuple returned by each call # to _est_funcs.projection; take the data we've pulled out, and # upon exiting the while loop, constructing a new tuple to return. # SMD 03/17/2009 proj_func = _est_funcs.projection # LocalEstFunction def func(counter: int, # unused singleparnum: int, lock: SupportsLock | None = None ) -> tuple[SupportsFloat, SupportsFloat, int, int, None]: try: singlebounds = proj_func(pars, parmins, parmaxes, parhardmins, parhardmaxes, sigma, eps, tol, maxiters, remin, [singleparnum], stat_cb, fit_cb) except EstNewMin as emin: # catch the EstNewMin exception and attach the modified # parameter values to the exception obj. These modified # parvals determine the new lower statistic. raise EstNewMin(pars) from emin if lock is not None: lock.acquire() report_progress(singleparnum, singlebounds[0], singlebounds[1]) if lock is not None: lock.release() return (singlebounds[0][0], singlebounds[1][0], singlebounds[2][0], singlebounds[3], None) if numsearched < 2 or not multi or numcores < 2: do_parallel = False if not do_parallel: lower_limits = np.zeros(numsearched) upper_limits = np.zeros(numsearched) eflags = np.zeros(numsearched, dtype=int) nfits = 0 for i, pnum in enumerate(limit_parnums): singlebounds = func(i, pnum) lower_limits[i] = singlebounds[0] upper_limits[i] = singlebounds[1] eflags[i] = singlebounds[2] nfits = nfits + singlebounds[3] return (lower_limits, upper_limits, eflags, nfits, None) return parallel_est(func, limit_parnums, pars, numcores) #################################confidence################################### class ConfArgs: """The class ConfArgs is responsible for the arguments to the fit call back function.""" def __init__(self, xpars: ArrayType, smin: ArrayType, smax: ArrayType, hmin: ArrayType, hmax: ArrayType, target_stat: SupportsFloat ) -> None: self.ith_par = 0 self.xpars = np.array(xpars, copy=True) self.slimit = (np.array(smin, copy=True), np.array(smax, copy=True)) self.hlimit = (np.array(hmin, copy=True), np.array(hmax, copy=True)) self.target_stat = target_stat def __call__(self ) -> tuple[int, np.ndarray, np.ndarray, np.ndarray, SupportsFloat]: return (self.ith_par, self.xpars, self.slimit, self.hlimit, self.target_stat) def __str__(self) -> str: a2s = np.array2string msg = '' msg += '# smin = ' + a2s(self.slimit[0], precision=6) + '\n' msg += '# smax = ' + a2s(self.slimit[1], precision=6) + '\n' msg += '# hmin = ' + a2s(self.hlimit[0], precision=6) + '\n' msg += '# hmax = ' + a2s(self.hlimit[1], precision=6) + '\n#\n' msg += '# Note: for the intermediate steps, the notation:\n' msg += ' par.name -/+: f( x ) = stat\n' msg += '# ==> `stat` is the statistic when parameter `par.name` is frozen at `x`\n' msg += '# while searching for the `lower/upper` confidence level, respectively.\n#' return msg def get_par(self) -> SupportsFloat: """return the current (worked on) par""" return self.xpars[self.ith_par] def get_hlimit(self, dir: int) -> SupportsFloat: """ return the current (worked on) hard limit""" return self.hlimit[dir][self.ith_par] def get_slimit(self, dir: int) -> SupportsFloat: """ return the current (worked on) soft limit""" return self.slimit[dir][self.ith_par] class ConfBlog: def __init__(self, blogger, prefix, verbose, lock) -> None: self.blogger = blogger self.prefix = prefix self.verbose = verbose self.lock = lock class Limit: """Represent a limit. This should not be used directly: use `LowerLimit` or `UpperLimit` instead. """ def __init__(self, limit: SupportsFloat) -> None: self.limit = limit class LowerLimit(Limit): "A lower limit" def __str__(self) -> str: return f'LowerLimit: limit={self.limit:e}' def is_beyond_limit(self, x) -> bool: return x < self.limit class UpperLimit(Limit): "An upper limit" def __str__(self) -> str: return f'UpperLimit: limit={self.limit:e}' def is_beyond_limit(self, x: SupportsFloat) -> bool: # The float calls are for typing checks. return float(x) > float(self.limit) class ConfBracket: """The class ConfBracket is responsible for bracketing the root within the interval (a,b) where f(a)*f(b) < 0.0""" neg_pos = (-1, 1) def __init__(self, myargs, trial_points) -> None: self.myargs = myargs self.trial_points = trial_points self.fcn: Callable | None = None # TODO: rename the dir and iter arguments def __call__(self, dir: int, iter, step_size, open_interval, maxiters, tol, bloginfo: ConfBlog ) -> ConfRootNone: # # Either 1) a root has been found (ConfRootZero), 2) an interval # where the root has been confined (ConfRootBracket) or 3) No # possible chance for a root (ConfRootNone), ie by trying points # upto/beyond the hard limit and no chance for a root has been found. # return self.find(dir, iter, step_size, open_interval, maxiters, tol, bloginfo) # TODO: rename the dir and iter arguments # TODO: the iter argument gets over-ridden below, so do we need it? (and rename it) def find(self, dir: int, iter, step_size, open_interval, maxiters, tol, bloginfo: ConfBlog, base=2.0 ) -> ConfRootNone: assert self.fcn is not None, 'callback func has not been set' hlimit = [LowerLimit(self.myargs.get_hlimit(dir)), UpperLimit(self.myargs.get_hlimit(dir))] slimit = [LowerLimit(self.myargs.get_slimit(dir)), UpperLimit(self.myargs.get_slimit(dir))] xxx = self.trial_points[0] fff = self.trial_points[1] conf_step = ConfStep(xxx, fff) mymaxiters = min(maxiters, 16) plateau = 0 max_plateau_iter = 5 try: # TODO: rename iter argument for iter in range(mymaxiters): if 0 == iter: x = conf_step.covar(dir, iter, step_size, base) elif 1 == iter: x = conf_step.secant(dir, iter, step_size, base) # x = conf_step.covar( dir, iter, step_size, base ) else: x = conf_step.quad(dir, iter, step_size, base, bloginfo) if x is None or np.isnan(x): return ConfRootNone() # Make sure x is not beyond the **hard** limit if hlimit[dir].is_beyond_limit(x): x = hlimit[dir].limit f = self.fcn(x, self.myargs()) # print 'find(): beyond hard limit: f(%.14e)=%.14e' % (x,f) if abs(f) <= tol: return ConfRootZero(x) if f >= 0.0: return ConfRootBracket(self.fcn, self.trial_points, open_interval) return ConfRootNone() if slimit[dir].is_beyond_limit(x): f = self.fcn(x, self.myargs()) # print 'find(): beyond soft limit: f(%.14e)=%.14e' % (x,f) if abs(f) <= tol: return ConfRootZero(x) if f >= 0.0: return ConfRootBracket(self.fcn, self.trial_points, open_interval) if f < fff[-2]: # if the fit beyond the soft limit is a better fit # then the confidence for the parameter does not exist return ConfRootNone() else: f = self.fcn(x, self.myargs()) # print 'find(): f(%.14e)=%.14e' % (x,f) if abs(f) <= tol: return ConfRootZero(x) if f >= 0.0: return ConfRootBracket(self.fcn, self.trial_points, open_interval) if Knuth_close(fff[-2], fff[-1], 1.0e-6): plateau += 1 if plateau > max_plateau_iter: # print 'find( %d ): plateau = %d', (iter,plateau) return ConfRootNone(None) # else: # if plateau > 0: # plateau -= 1 return ConfRootNone(None) except OutOfBoundErr: return ConfRootNone() class ConfRootNone: """The base class for the root of the confidence interval""" def __init__(self, root: SupportsFloat | None = None ) -> None: """If self.root == None, then 1) points up to the hard limits were tried and it was not possible to bracketed the solution. 2) a parameter beyond the soft limit has been tried and the new stat was found to be **less** then the initial minimum""" self.root = root def __call__(self, tol, bloginfo: ConfBlog ) -> SupportsFloat | None: return self.root def __str__(self) -> str: return 'No possible root exist' class ConfRootBracket(ConfRootNone): """The class contains the bracket where the confidence root has been bracketed, ie where f(a)*f(b) < 0""" def __init__(self, fcn, trial_points, open_interval ) -> None: super().__init__(root=None) self.fcn = fcn self.trial_points = trial_points self.open_interval = open_interval def __call__(self, tol, bloginfo: ConfBlog ) -> SupportsFloat | tuple[SupportsFloat, SupportsFloat] | None: def warn_user_about_open_interval(listval): if bloginfo.lock is not None: bloginfo.lock.acquire() if 0 == bloginfo.verbose: prefix = '%s ' % bloginfo.prefix.lstrip() else: prefix = '%s ' % bloginfo.prefix interval = list_to_open_interval(listval) bloginfo.blogger.info(prefix + 'WARNING: The confidence level lies within ' + interval) if bloginfo.lock is not None: bloginfo.lock.release() xxx = self.trial_points[0] fff = self.trial_points[1] if np.sign(fff[-2]) == np.sign(fff[-1]): self.root = None return None answer = zeroin(self.fcn, xxx[-2], xxx[-1], fa=fff[-2], fb=fff[-1], maxfev=32, tol=tol) if abs(answer[0][1]) > tol: xafa = answer[1][0] xa = xafa[0] fa = xafa[1] xbfb = answer[1][1] xb = xbfb[0] fb = xbfb[1] if np.sign(fa) != np.sign(fb): if not self.open_interval: warn_user_about_open_interval([xa, xb]) return (xa + xb) / 2.0 return min(xa, xb), max(xa, xb) return None self.root = answer[0][0] return self.root def __str__(self) -> str: msg = 'root is within the interval ( f(%e)=%e, f(%e)=%e )' \ % (self.trial_points[0][-2], self.trial_points[1][-2], self.trial_points[0][-1], self.trial_points[1][-1], ) return msg class ConfRootZero(ConfRootNone): """The class with the root/zero of the confidence interval""" def __str__(self) -> str: return f'root = {self.root:e}' class ConfStep: def __init__(self, xtrial, ftrial) -> None: self.xtrial = xtrial self.ftrial = ftrial def covar(self, dir, iter, stepsize, base): return self.xtrial[-1] + \ ConfBracket.neg_pos[dir] * pow(base, iter) * stepsize # return self.xtrial[ 0 ] + \ # ConfBracket.neg_pos[ dir ] * pow( base, iter ) * stepsize def Halley(self, coeffs, x, maxfev=8, tol=1.0e-3): for nfev in range(maxfev): ax = coeffs[0] * x fval = (ax + coeffs[1]) * x + coeffs[2] if abs(fval) <= tol: return [x, fval] fdif = 2.0 * ax + coeffs[1] fdif2 = 2.0 numer = 2.0 * fval * fdif denom = 2.0 * fdif * fdif - fval * fdif2 x -= numer / denom nfev += 1 return [x, fval] def is_same_dir(self, dir, current_pos, proposed_pos) -> bool: delta = proposed_pos - current_pos return np.sign(delta) == ConfBracket.neg_pos[dir] def quad(self, dir, iter, step_size, base, bloginfo): coeffs = quad_coef(self.xtrial[-3:], self.ftrial[-3:]) delta = ConfBracket.neg_pos[dir] delta *= abs(self.xtrial[-1] - self.xtrial[-2]) lastx = self.xtrial[-1] mroot = demuller(np.poly1d(coeffs), lastx + delta, lastx + 2 * delta, lastx + 3 * delta, tol=1.0e-2) xroot = mroot[0][0] if xroot is None or np.isnan(xroot): return self.covar(dir, iter, step_size, base) try: [xroot, froot] = self.Halley(coeffs, xroot, tol=1.0e-3) except ZeroDivisionError: xroot = None if xroot is not None and not np.isnan(xroot) and \ self.is_same_dir(dir, self.xtrial[-1], xroot): return xroot return self.covar(dir, iter, step_size, base) def secant(self, dir, iter, step_size, base): xb = self.xtrial[-2] fb = self.ftrial[-2] xa = self.xtrial[-1] fa = self.ftrial[-1] if abs(fb) > abs(fa) or 0.0 == fa: return self.covar(dir, iter, step_size, base) s = fb / fa p = (xa - xb) * s if 1.0 == s: return self.covar(dir, iter, step_size, base) q = 1.0 - s x = xb - p / q if self.is_same_dir(dir, xa, x): return x return self.covar(dir, iter, step_size, base) def confidence(pars: np.ndarray, parmins: np.ndarray, parmaxes: np.ndarray, parhardmins: np.ndarray, parhardmaxes: np.ndarray, sigma: float, eps: float, tol: float, maxiters: int, remin: float, verbose: bool, # different to covariance/projection limit_parnums: np.ndarray, # integers stat_cb: Callable, fit_cb: Callable, report_progress: Callable, get_par_name: Callable, do_parallel: bool, numcores: int, open_interval ) -> EstReturn: def get_prefix(index, name, minus_plus): '''To print the prefix/indent when verbose is on''' blank = 3 * index * ' ' return [f"{blank}{name} {mtext}:" for mtext in minus_plus] def get_delta_root(arg, dir, par_at_min): my_neg_pos = ConfBracket.neg_pos[dir] if is_iterable(arg): # return map( lambda x: my_neg_pos * abs( x - par_at_min ), arg ) return arg if arg is not None: arg -= par_at_min return my_neg_pos * abs(arg) return arg def get_step_size(error_scales, upper_scales, index, par): if 0 != error_scales[index]: # if covar error is NaN then set it to fraction of the par value. ith_covar_err = 0.0625 * abs(par) else: ith_covar_err = abs(upper_scales[index]) if 0.0 == ith_covar_err: # just in case covar and/or par is 0 ith_covar_err = 1.0e-6 return ith_covar_err def monitor_func(fcn, history): def myfunc(x, *args): fval = fcn(x, *args) history[0].append(x) history[1].append(fval) return fval return myfunc def print_status(myblog, verbose, prefix, answer, lock): if lock is not None: lock.acquire() if 0 == verbose: msg = '%s\t' % prefix.lstrip() else: msg = '%s\t' % prefix if is_iterable(answer): msg += list_to_open_interval(answer) elif answer is None: msg += '-----' else: msg += '%g' % answer myblog(msg) if lock is not None: lock.release() # # Work in the translated coordinate. Hence the 'errors/confidence' # are the zeros/roots in the translated coordinate system. # def translated_fit_cb(fcn, myargs): def translated_fit_cb_wrapper(x, *args): hlimit = myargs.hlimit slimit = myargs.slimit hmin = hlimit[0] hmax = hlimit[1] xpars = myargs.xpars ith_par = myargs.ith_par # The parameter must be within the hard limits if x < hmin[ith_par] or x > hmax[ith_par]: raise OutOfBoundErr smin = slimit[0] smax = slimit[1] orig_ith_xpar = xpars[ith_par] xpars[ith_par] = x translated_stat = fcn( xpars, smin, smax, ith_par) - myargs.target_stat xpars[ith_par] = orig_ith_xpar return translated_stat return translated_fit_cb_wrapper def verbose_fitcb(fcn, bloginfo): if 0 == bloginfo.verbose: return fcn def verbose_fcn(x, *args): fval = fcn(x, *args) msg = '%s f( %e ) =' % (bloginfo.prefix, x) if fval is None: msg = '%s None' % msg else: msg = '%s %e' % (msg, fval) bloginfo.blogger.info(msg) return fval return verbose_fcn sherpablog = logging.getLogger('sherpa') # where to print progress report # Get minimum fit statistic, and calculate target statistic value orig_min_stat = stat_cb(pars) delta_stat = sigma * sigma target_stat = orig_min_stat + delta_stat lower_scales = None upper_scales = None error_scales = None nfits = 0 results = None try: (lower_scales, upper_scales, error_scales, nfits, results) = covariance(pars, parmins, parmaxes, parhardmins, parhardmaxes, 1.0, eps, tol, maxiters, remin, limit_parnums, stat_cb, fit_cb, report_progress) except EstNewMin as e: raise e except: error_scales = np.full(len(pars), est_hardminmax) myargs = ConfArgs(pars, parmins, parmaxes, parhardmins, parhardmaxes, target_stat) if 0 != verbose: msg = '#\n# f' + np.array2string(np.asarray(pars), precision=6) msg += ' = %e\n' % orig_min_stat msg += '# sigma = %e\n' % sigma msg += '# target_stat = %e\n' % target_stat msg += '# tol = %e\n' % eps msg += '%s' % myargs sherpablog.info(msg) # TODO: this dictionary is used to store a value, but the value is # never used. Do we need it? store = {} # LocalEstFunc def func(counter: int, singleparnum: int, lock: SupportsLock | None = None ) -> tuple[SupportsFloat, SupportsFloat, int, int, None]: counter_cb = FuncCounter(fit_cb) # # These are the bounds to be returned by this method # conf_int = [[], []] error_flags = [] # # If the user has requested a specific parameter to be # calculated then 'ith_par' represents the index of the # free parameter to deal with. # myargs.ith_par = singleparnum fitcb = translated_fit_cb(counter_cb, myargs) par_name = get_par_name(myargs.ith_par) ith_covar_err = get_step_size(error_scales, upper_scales, counter, pars[myargs.ith_par]) trial_points = [[], []] fitcb = monitor_func(fitcb, trial_points) bracket = ConfBracket(myargs, trial_points) # the parameter name is set, may as well get the prefix prefix = get_prefix(counter, par_name, ['-', '+']) myfitcb = [verbose_fitcb(fitcb, ConfBlog(sherpablog, prefix[0], verbose, lock)), verbose_fitcb(fitcb, ConfBlog(sherpablog, prefix[1], verbose, lock))] for dirn in range(2): # # trial_points stores the history of the points for the # parameter which has been evaluated in order to locate # the root. Note the first point is 'given' since the info # of the minimum is crucial to the search. # bracket.trial_points[0].append(pars[myargs.ith_par]) bracket.trial_points[1].append(- delta_stat) myblog = ConfBlog(sherpablog, prefix[dirn], verbose, lock) # have to set the callback func otherwise disaster. bracket.fcn = myfitcb[dirn] root = bracket(dirn, iter, ith_covar_err, open_interval, maxiters, eps, myblog) myzero = root(eps, myblog) delta_zero = get_delta_root(myzero, dirn, pars[myargs.ith_par]) conf_int[dirn].append(delta_zero) status_prefix = get_prefix(counter, par_name, ['lower bound', 'upper bound']) print_status(myblog.blogger.info, verbose, status_prefix[dirn], delta_zero, lock) # This should really set the error flag appropriately. error_flags.append(est_success) # # include the minimum point to separate the -/+ interval # store[par_name] = trial_points return (conf_int[0][0], conf_int[1][0], error_flags[0], counter_cb.nfev, None) if len(limit_parnums) < 2 or not multi or numcores < 2: do_parallel = False if not do_parallel: lower_limits = [] upper_limits = [] eflags = [] nfits = 0 for i, lpar in enumerate(limit_parnums): lower_limit, upper_limit, flags, nfit, extra = func( i, lpar) lower_limits.append(lower_limit) upper_limits.append(upper_limit) eflags.append(flags) nfits += nfit return (lower_limits, upper_limits, eflags, nfits, None) return parallel_est(func, limit_parnums, pars, numcores) #################################confidence################################### class LocalEstFunc(Protocol): """Process a single parameter.""" def __call__(self, counter: int, singleparnum: int, lock: SupportsLock | None = None ) -> tuple[SupportsFloat, SupportsFloat, int, int, None]: ... def parallel_est(estfunc: LocalEstFunc, limit_parnums: np.ndarray, # integers pars: np.ndarray, numcores: int = ncpus ) -> EstReturn: """Run a function on a sequence of inputs in parallel. A specialized version of sherpa.utils.parallel.parallel_map. Parameters ---------- estfunc : function This function accepts three arguments and returns a value. limit_parnums : sequence pars : sequence The current parameter values numcores : int, optional The number of calls to ``function`` to run in parallel. When set to ``None``, all the available CPUs on the machine - as set either by the 'numcores' setting of the 'parallel' section of Sherpa's preferences or by `multiprocessing.cpu_count` - are used. Returns ------- ans : tuple """ # See sherpa.utils.parallel for a discussion of how multiprocessing is # being used to run code in parallel. # manager = context.Manager() out_q = manager.Queue() err_q = manager.Queue() # Unlike sherpa.utils.parallel.parallel_map, these routines do use a lock # for serializing screen output. # lock = manager.Lock() size = len(limit_parnums) parids = np.arange(size) # if len(limit_parnums) is less than numcores, only use length number of # processes if size < numcores: numcores = size # group limit_parnums into numcores-worth of chunks limit_parnums = np.array_split(limit_parnums, numcores) parids = np.array_split(parids, numcores) def worker(parids, parnums) -> None: results = [] for parid, singleparnum in zip(parids, parnums): try: result = estfunc(parid, singleparnum, lock) results.append((parid, result)) except EstNewMin: # catch the EstNewMin exception and include the exception # class and the modified parameter values to the error queue. # These modified parvals determine the new lower statistic. # The exception class will be re-raised with the # parameter values attached. C++ Python exceptions are not # picklable for use in the queue. err_q.put(EstNewMin(pars)) return except Exception as e: err_q.put(e) return out_q.put(results) tasks = [context.Process(target=worker, args=(parid, parnum)) for parid, parnum in zip(parids, limit_parnums)] return run_tasks(tasks, out_q, err_q, size) def run_tasks(tasks: Sequence[SupportsProcess], out_q: SupportsQueue, err_q: SupportsQueue, size: int ) -> EstReturn: """Run the processes, exiting early if necessary, and return the results. A specialized version of sherpa.utils.parallel.run_tasks (note the different order of the queues). Parameters ---------- procs : list of multiprocessing.Process tasks The processes to run. out_q, err_q : manager.Queue The success and error and channels used by the processes. size : int The number of results being returned (can be larger than len(procs)). Returns ------- result : tuple """ process_tasks(tasks, err_q) lower_limits = size * [None] upper_limits = size * [None] eflags = size * [None] nfits = 0 while not out_q.empty(): for parid, singlebounds in out_q.get(): # Have to guarantee that the tuple returned by projection # is always (array, array, array, int) for this to work. lower_limits[parid] = singlebounds[0] upper_limits[parid] = singlebounds[1] eflags[parid] = singlebounds[2] nfits += singlebounds[3] return (lower_limits, upper_limits, eflags, nfits, None)