GBM-data-tools/spectra/fitting.py

1297 lines
55 KiB
Python

# fitting: Classes and functions for MLE of count spectra
#
# Authors: William Cleveland (USRA),
# Adam Goldstein (USRA) and
# Daniel Kocevski (NASA)
#
# Portions of the code are Copyright 2020 William Cleveland and
# Adam Goldstein, Universities Space Research Association
# All rights reserved.
#
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import warnings
from datetime import datetime
import numpy as np
from numpy.random import multivariate_normal
from scipy.linalg import inv
from scipy.misc import derivative
from scipy.optimize import minimize, brentq
from scipy.stats import chi2
from ..background import BackgroundRates, BackgroundSpectrum
from ..data.primitives import EnergyBins
class SpectralFitter:
"""Base class for jointly-fitting spectra from multiple detectors.
This is a (large) wrapper around `scipy.optimize.minimize
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_.
This class should not be instantiated by the user, but instead a child class
for a specific fit statistic should be instantiated.
Child classes should be designed with the following requirements:
1. An ``__init__`` method that takes in the inputs in the Parent ``__init__``
method. Can have additional arguments/keywords.
2. A ``_fold_model`` method that accepts the model function and the list of
parameters. It must return a single number that represents the likelihood
for a single detector/dataset.
Parameters:
pha_list (list of :class:`~gbm.data.PHA`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gbm.background.BackgroundRates` or \
list of :class:`~gbm.background.BackgroundSpectrum`):
The background rates object for each detector or background spectrum
for each detector. If given the background rates object, the times
in the corresponding PHA object will be used for the limits of
integration.
rsp_list (list of :class:`~gbm.data.RSP`):
The response object for each detector
statistic (<function>): The function to be used as a fit statistic
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`valid_channels` attribute in the PHA object. You can
set channel_masks to override the PHA :attr:`valid_channels`,
but this is usually unnecessary.
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two
algorithms that provide support for fitting with bounds and
constraints. Other algorithms will ignore bounds and constraints.
Attributes:
covariance (np.array): The covariance matrix of the fitted parameters.
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the covariance matrix
may not be valid.
detectors (list of str): The detectors used in the fit
dof (int): The number of degrees of freedom
energy_range (float, float): The full energy range of the fit
function_components (list of str): The names of the individual model
components, if appropriate
function_name (str): The name of the model function being fit
hessian (np.array):
The Hessian matrix of the fitted parameters, for which the
covariance matrix is -inverse(Hessian).
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the Hessian matrix may
not be valid.
jacobian (np.array): The Jacobian vector of the likelihood as a
function of the parameters.
message (str): The message from the fitter on convergence/non-convergence
num_components (int): Number of function components
num_sets (int): Number of datasets/detectors being fit
parameters (np.array): The parameter values at the maximum likelihood
statistic (float): The fit statistic (likelihood)
success (bool): True if the fit was successful, False otherwise.
symmetric_errors (np.array):
The 1-sigma uncertainty on the parameters assuming higher-order terms
of the Taylor expansion of the likelihood about the maximum
likelihood solution is negligible. If those terms are not negligible,
errors could be very incorrect or event NaN.
See attributes :attr:`covariance` and :attr:`hessian`.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, statistic,
channel_masks=None, method='SLSQP'):
# check that we have the right numbers of data, backgrounds, responses,
# and fit masks
self._num_sets = len(pha_list)
if (len(bkgd_list) != self._num_sets) or (
len(rsp_list) != self._num_sets):
raise ValueError('Number of datasets, backgrounds, and responses ' \
'must be the same')
# set the energy channel masks
if channel_masks is not None:
if len(channel_masks) != self._num_sets:
raise ValueError('If channel_masks is set, must be equal to the \
number of datasets')
# store response, exposure, and channel masks
self._rsp = rsp_list
self._exposure = np.array([pha.exposure for pha in pha_list])
if channel_masks is None:
channel_masks = []
for pha in pha_list:
chan_mask = np.zeros(pha.numchans, dtype=bool)
chan_mask[pha.valid_channels] = True
channel_masks.append(chan_mask)
self._chan_masks = channel_masks
# extract observed counts and apply channel mask
self._data = self._apply_masks([pha.data.counts for pha in pha_list])
# extract background rates and variances and apply channel masks
self._back_rates = []
self._back_var = []
for bkgd, pha in zip(bkgd_list, pha_list):
if isinstance(bkgd, BackgroundRates):
bkgd_spec = bkgd.integrate_time(*pha.time_range)
elif isinstance(bkgd, BackgroundSpectrum):
bkgd_spec = bkgd
else:
raise ValueError('Unknown Background object')
self._back_rates.append(bkgd_spec.rates)
self._back_var.append(bkgd_spec.rate_uncertainty ** 2)
self._back_rates = self._apply_masks(self._back_rates)
self._back_var = self._apply_masks(self._back_var)
# fitter/function info
self._stat = statistic
self._method = method
self._function = None
self._res = None
@property
def statistic(self):
if self._res is not None:
return self._res.fun
@property
def dof(self):
free_params = np.sum(self._function.free)
datapts = np.sum([chan_mask.sum() for chan_mask in self._chan_masks])
return datapts - free_params
@property
def function_name(self):
if self._function is not None:
return self._function.name
@property
def function_components(self):
if self._function is not None:
try:
return self._function.names
except:
pass
@property
def num_components(self):
if self._function is not None:
return self._function.num_components
@property
def detectors(self):
return [rsp.detector for rsp in self._rsp]
@property
def energy_range(self):
emin = np.concatenate(
self._apply_masks([rsp.ebounds['E_MIN'] for rsp in self._rsp]))
emax = np.concatenate(
self._apply_masks([rsp.ebounds['E_MAX'] for rsp in self._rsp]))
return (np.min(emin), np.max(emax))
@property
def num_sets(self):
return self._num_sets
@property
def success(self):
if self._res is not None:
return self._res.success
@property
def message(self):
if self._res is not None:
return self._res.message
@property
def parameters(self):
if self._res is not None:
return self._res.x
@property
def jacobian(self):
if self._res is not None:
return self._res.jac
@property
def hessian(self):
if self._res is not None:
return self._hessian(self.parameters, self._function)
@property
def covariance(self):
if self._res is not None:
cov = inv(-self.hessian)
id = np.identity(self.parameters.size, dtype=bool)
return cov
@property
def symmetric_errors(self):
if self._res is not None:
id = np.identity(self.parameters.size, dtype=bool)
return np.sqrt(np.abs(self.covariance[id]))
@classmethod
def load(cls, filename):
"""Loads a fitter object that has been saved to disk in a compressed
numpy file. Thanks to the cool properties of the numpy format, this
method will load the complete state of the fitter when it was saved.
Args:
filename (str): The complete filename of the saved file
"""
loaded = np.load(filename, allow_pickle=True)
obj = loaded['obj'].reshape(-1)[0]
print('Fit loaded from {}'.format(loaded['time']))
return obj
def save(self, filename):
"""Saves the fitter object to a compressed binary numpy file. The full
state of the fitter is serialized and saved, and it can be restored
using the :meth:`load` method.
Args:
filename (str): The complete filename to save the fitter object to
"""
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
np.savez_compressed(filename, time=now, obj=self)
def fit(self, function, **kwargs):
"""Fit the given the function to the data
Args:
function (:class:`~.functions.Function`): The function object to use
**kwargs: Keyword arguments passed to the fitter
"""
self._function = function
# treat the fixed parameters
init_params = np.array(function.default_values)[function.free].tolist()
# do the fit, also returns the jacobian
res = minimize(self._eval_stat, init_params, args=(function,),
method=self._method, bounds=function.parameter_bounds(),
jac=True, **kwargs)
self._res = res
def asymmetric_errors(self, cl=0.683):
"""Calculate the asymmetric uncertainty on each fitted parameter.
This function uses `scipy.optimize.brentq
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.brentq.html>`_
to find the root of:
crit = -2(LL_max-LL),
where *crit* is the chi-square critical value defined by the number of
free parameters and the confidence level, and *LL_max* is the maximum
log-likelihood. The brentq function uses a bracketing range to find the
roots, and this function uses the best-fit values of the parameters for
one side of the bracket and the min/max values defined in the fitted
function for the other side of the bracket.
Args:
cl (float, optional): The confidence level of the uncertainties.
Default is 0.683 (1 sigma).
Returns:
np.array: A (``nparams``, 2) array where each row reprensents the \
(negative, positive) uncertainty on each parameter.
"""
# must have a fit
if self._res is None:
raise RuntimeError('Fit has not been performed')
# calculate the critical value
nparams = self.parameters.size
crit = chi2.ppf(cl, nparams)
# this is the objective function for which we are finding the root
def the_func(param, index):
params = self.parameters.copy()
params[index] = param
test_like = self._fold_model(self._function.fit_eval, params)
delta = 2.0 * (-self._res.fun - test_like)
return delta - crit
# Find the lower and upper bound of the uncertainty. Here, we use
# Brent's Method to determine the bounds, which requires a bracketing
# range of the solution. One side of the bracket is the maximum
# likelihood value. If the function parameter has a finite support,
# the corresponding end of the support is used for the other side of the
# bracket, otherwise, the other side of the bracket is iteratively
# decreased (increased) until the lower (upper) bound is constrained
# by Brent's Method. This has to be done because we don't know a priori
# how far away from maximum likelihood the bound is.
# lower bound
root1 = np.zeros(nparams)
for i in range(nparams):
param = self.parameters[i]
while (True):
if self._function.min_values[i] == -np.inf:
if param < 0.0:
minval = 2.0 * param
elif param > 0.1:
minval = 0.5 * param
else:
minval = param - 1.0
else:
minval = self._function.min_values[i]
# if we're at the bounds, we can't bracket the peak
try:
r, o = brentq(the_func, self.parameters[i], minval, args=(i,),
full_output=True, maxiter=1000)
break
except ValueError:
if minval == self._function.min_values[i]:
warnings.warn("Parameter exists at its lower bound")
r = minval
break
param = minval
root1[i] = r
# upper bound
root2 = np.zeros(nparams)
for i in range(nparams):
param = self.parameters[i]
while (True):
if self._function.max_values[i] == np.inf:
if param < -0.1:
maxval = 0.5 * param
elif param > 0.0:
maxval = 2.0 * param
else:
maxval = 1.0 + param
else:
maxval = self._function.max_values[i]
try:
r, o = brentq(the_func, self.parameters[i], maxval, args=(i,),
full_output=True, maxiter=1000)
break
except ValueError:
# if we're at the bounds, we can't bracket the peak
if maxval == self._function.max_values[i]:
warnings.warn("Parameter exists at its upper bound")
r = maxval
break
param = maxval
root2[i] = r
# convert the bounds to -/+ uncertainties
errs = np.zeros((nparams, 2))
errs[:, 0] = np.abs(self.parameters - root1)
errs[:, 1] = np.abs(self.parameters - root2)
return errs
def residuals(self, sigma=True):
"""The fit residuals for each dataset in units of differential
counts or model sigma
Args:
sigma (bool, optional): If True, return in units of model sigma,
otherwise return in units of differential counts. Default is True.
Returns:
tuple: A 4-tuple containing:
- *list of np.array*: Energy centroids
- *list of np.array*: Energy channel widths
- *list of np.array*: Fit residuals
- *list of np.array*: Residual uncertainties
"""
chanwidths = self._apply_masks(
[rsp.channel_widths for rsp in self._rsp])
# calculate the count spectrum model
model = self.model_count_spectrum()
# residuals are the differential model counts subtracted from the
# differential source counts above background
resid = []
for i in range(self.num_sets):
back_rates = self._back_rates[i] / chanwidths[i]
rates = self._data[i] / (self._exposure[i] * chanwidths[i])
resid.append((rates - back_rates) - model[i].rates)
# can calculate the residuals as a function of the model uncertainty
model_var = self.model_variance()
if sigma:
resid = [resid[i] / np.sqrt(model_var[i]) for i in
range(self.num_sets)]
resid_err = np.ones_like(resid)
else:
resid_err = [np.sqrt(var) for var in model_var]
# the energy centroids and widths
energy = self._apply_masks(
[rsp.channel_centroids for rsp in self._rsp])
ewidths = [chanwidth / 2.0 for chanwidth in chanwidths]
return (energy, ewidths, resid, resid_err)
def model_variance(self):
"""The differential source model variance, accounting for the data variance
and background model variance.
Returns:
list of np.array: The model variance for each dataset.
"""
chanwidths = self._apply_masks(
[rsp.channel_widths for rsp in self._rsp])
# model variance: need positive data counts, background rate and
# variance and exposure
mvar = []
for i in range(self.num_sets):
counts = self._rsp[i].fold_spectrum(self._function.fit_eval,
self.parameters)
counts = counts[self._chan_masks[i]]
model_rate = counts / (self._exposure[i])
model_rate[counts < 0.0] = 0.0
mvar.append(self._back_var[i] + \
(model_rate + self._back_rates[i]) / \
(np.abs(self._exposure[i]) * chanwidths[i]))
return mvar
def model_count_spectrum(self):
"""The fitted model count spectrum
Returns:
list of :class:`~gbm.data.primitives.EnergyBins`: An EnergyBins \
object for each dataset
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
ebins = []
for i in range(self.num_sets):
rates = self._rsp[i].fold_spectrum(self._function.fit_eval,
self.parameters)
counts = rates * self._exposure[i]
ebin = EnergyBins(counts[self._chan_masks[i]],
self._rsp[i].ebounds['E_MIN'][
self._chan_masks[i]],
self._rsp[i].ebounds['E_MAX'][
self._chan_masks[i]],
[self._exposure[i]] * self._chan_masks[i].sum())
ebins.append(ebin)
return ebins
def data_count_spectrum(self, upper_limits_sigma=2.0):
"""The observed source count spectrum, which is (total-background).
Data bins with source counts less than the model variance are converted
to upper limits.
Args:
upper_limits_sigma (float, optional): The upper limits sigma.
Default is 2.
Returns:
tuple: A 5-tuple containing:
- *list of np.array*: Energy centroids
- *list of np.array*: Energy channel widths
- *list of np.array*: differential count spectrum
- *list of np.array*: 1-sigma count spectrum errors
- *list of np.array*: Upper limits boolean mask
for each dataset. The upper limits mask is a boolean mask where
True indicates the element is an upper limit.
"""
chanwidths = self._apply_masks(
[rsp.channel_widths for rsp in self._rsp])
mvar = self.model_variance()
# calculate the differential count spectrum and upper limit masks
src_spectra = []
ulmasks = []
for i in range(self.num_sets):
src_counts = (self._data[i] - self._back_rates[i] * \
self._exposure[i]) / (self._exposure[i] * \
chanwidths[i])
ulmask = src_counts < np.sqrt(mvar[i])
src_counts[ulmask] = upper_limits_sigma * np.sqrt(mvar[i])[ulmask]
src_spectra.append(src_counts)
ulmasks.append(ulmask)
energy = self._apply_masks(
[rsp.channel_centroids for rsp in self._rsp])
ewidths = [chanwidth / 2.0 for chanwidth in chanwidths]
src_errs = [np.sqrt(var) for var in mvar]
return (energy, ewidths, src_spectra, src_errs, ulmasks)
def spectrum(self, which, numpoints=1000, components=False):
r"""The model photon, energy, or :math:`\nu F_\nu` spectrum.
Args:
which (str): Which spectrum to return.
Either 'photon', 'energy', or 'nufnu'
numpoints (int, optional): The number of grid points in energy.
Default is 1000.
components (bool, optional):
If True, calculate the spectrum for each individual model
components (if there are multiple components). Default is False.
Returns:
tuple: A 2-tuple containing:
- *np.array*: The energy grid at which the spectrum is calculated
- *np.array* or *list of np.array*: model spectrum values, or
spectrum values for each component
"""
if self._res is None:
raise RuntimeError('Fit has not been performed')
grid = np.logspace(*np.log10(self.energy_range), numpoints)
pmodel = self._function.fit_eval(self.parameters, grid,
components=components)
# multiply by energy or energy^2 if returning energy or nufnu spectrum
if which == 'energy':
pmodel *= grid[np.newaxis,:]
elif which == 'nufnu':
pmodel *= grid[np.newaxis,:]**2
else:
pass
return (grid, pmodel)
def sample_parameters(self, **kwargs):
"""Produce samples of the fitted parameters. This uses the covariance
matrix and assumes a multivariate normal distribuion. Caveat Emptor.
Args:
**kwargs: Options (like size) to be passed to numpy.random.multivariate_normal
Returns:
np.array: The random parameter samples
"""
samples = multivariate_normal(self.parameters, self.covariance,
**kwargs)
return samples
def sample_spectrum(self, which, num_samples=1000, numpoints=1000,
components=False):
r"""Produce samples of the model photon, energy, or :math:`\nu F_\nu`
spectrum. This uses the covariance matrix and assumes a multivariate
normal distribuion for the parameters. Caveat Emptor.
Args:
which (str): Which spectrum to return.
Either 'photon', 'energy', or 'nufnu'
num_samples (int, optional): The number of spectrum to sample.
Default is 1000.
numpoints (int, optional): The number of grid points in energy.
Default is 1000.
components (bool, optional):
If True, calculate the spectrum for each individual model
components (if there are multiple components). Default is False.
Returns:
tuple: A 2-tuple containing:
- *np.array*: The energy grid at which the spectrum is calculated
- *np.array*: Array of shape (``num_samples``, ``numpoints``) if
there is only one component or
(``num_samples``, ``num_components``, ``numpoints``) if there
are multiple components.
"""
params = self.sample_parameters(size=num_samples)
grid = np.logspace(*np.log10(self.energy_range), numpoints)
if self.num_components > 1:
model = np.array([self._function.fit_eval(param, grid,
components=components) \
for param in params])
else:
model = np.array([self._function.fit_eval(param, grid) \
for param in params])
if which == 'energy':
model *= grid[np.newaxis, :]
elif which == 'nufnu':
model *= grid[np.newaxis, :] ** 2
else:
pass
return (grid, model)
def sample_flux(self, erange, num_samples=1000, numpoints=1000, **kwargs):
"""Produce samples of the energy-integrated photon or energy flux.
This uses the covariance matrix and assumes a multivariate normal
distribuion for the parameters. Caveat Emptor.
Args:
erange (float, float): Which spectrum to return. Either 'photon',
'energy', or 'nufnu'
num_samples (int): The number of spectra to sample. Default is 1000.
numpoints (int): The number of grid points in energy. Default is 1000.
**kwargs: Options to be passed to
:meth:`~gbm.spectra.functions.Function.integrate`.
Returns:
np.array: The sampled flux
"""
params = self.sample_parameters(size=num_samples)
flux = [
self._function.integrate(param_vec, erange, num_points=numpoints,
**kwargs) for param_vec in params]
return flux
def _fold_model(self, function, params):
"""Folds the model throught the spectrum and calculates the fit stat
Note:
This is an empty function in the base class, and in inherited class
must define this.
Args:
function (:class:`~.functions.Function`): The function object to use
params (list): The parameters values
Returns:
float: The fit statistic value
"""
pass
def _eval_stat(self, params, function):
"""Evaluate the statistic (and jacobian of the statistic as a function
of the model parameters).
Args:
params (list): The parameters values
function (class:`~.functions.Function`) The function object to use
Returns:
(float, np.array): The fit statistic and the jacobian of the fit statistic
"""
stat = self._fold_model(function.fit_eval, params)
jac = self._jacobian(params, function)
return -stat, -jac
def _jacobian(self, params, function):
"""Calculate the Jacobian of the fit statistic as a function of the
model parameters. This is evalated numerically using finite differences.
Args:
params (list): The parameters values
function (class:`~.functions.Function`) The function object to use
Returns:
np.array: The Jacobian
"""
num_params = len(params)
grad = np.zeros(num_params)
# for each parameter, do partial derivative using finite differences
for i in range(num_params):
params_temp = params.copy()
dx = np.abs((1.0 + function.delta_rel[i]) * params[i] - params[i])
grad[i] = self._pderiv(function.fit_eval, params_temp, index=i,
dx=dx)
return grad
def _hessian(self, params, function):
"""Calculate the Hessian of the fit statistic as a function of the
model parameters. This is evalated numerically using finite differences.
Args:
params (list): The parameters values
function (class:`~.functions.Function`) The function object to use
Returns:
np.array: The Hessian matrix
"""
num_params = len(params)
hess = np.zeros((num_params, num_params))
for i in range(num_params):
for j in range(num_params):
# Hessian is symmetric, so we only have to calculate either the
# upper or lower triangle
if i > j:
hess[i, j] = hess[j, i]
continue
params_temp = params.copy()
dx1 = np.abs(
(1.0 + function.delta_rel[i]) * params[i] - params[i])
dx2 = np.abs(
(1.0 + function.delta_rel[j]) * params[j] - params[j])
hess[i, j] = self._mixed_pderiv(function.fit_eval, params_temp,
i, j, dx1, dx2)
return hess
def _pderiv(self, function, params, index=0, **kwargs):
"""Partial derivative of function with respect to one of its parameters
This is a wrapper around scipy.misc.derivative to allow a partial
derivative to be calculated
Args:
function (class:`~.functions.Function`) The function object to use
params (list): The parameters values
index (int, optional):
The index into the parameter list, representing the parameter
for which the partial derivative will be calculated. Default is 0
**kwargs: Options to pass to scipy.misc.derivative
(such as the ``dx`` argument)
Returns:
float: The partial derivative
"""
def wraps(x):
params[index] = x
the_args = [function, params]
return self._fold_model(*the_args)
return derivative(wraps, params[index], **kwargs)
def _mixed_pderiv(self, function, params, index1, index2, dx1, dx2):
"""Mixed second partial derivative of function with respect to its
parameters. This calls _pderiv.
Args:
function (class:`~.functions.Function`) The function object to use
params (list): The parameters values
index1 (int): The index into the parameter list, representing the
parameter for which the first partial derivative will be
calculated.
index2 (int): The index into the parameter list, representing the
parameter for which the second partial derivative will be
calculated.
dx1 (float): The step size for the first parameter
dx2 (flot): The step size for the second parameter
Returns:
float: The second mixed partial derivative
"""
def wraps(x):
params[index2] = x
the_args = [function, params]
return self._pderiv(*the_args, index=index1, dx=dx1)
return derivative(wraps, params[index2], dx=dx2)
def _apply_masks(self, a_list):
"""Apply the fit masks to a list of arrays
Args:
a_list (list of np.array): The list of arrays
Returns:
list of np.array: The masked array
"""
return [one_list[one_mask] \
for one_list, one_mask in zip(a_list, self._chan_masks)]
class SpectralFitterChisq(SpectralFitter):
"""Jointly-fit datasets using Chi-Squared Likelihood.
Parameters:
pha_list (list of :class:`~gbm.data.PHA`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gbm.background.BackgroundRates`):
The background rates object for each detector
rsp_list (list of :class:`~gbm.data.RSP`):
The response object for each detector
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`valid_channels` attribute in the PHA object. You can
set channel_masks to override the PHA :attr:`valid_channels`,
but this is usually unnecessary.
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two
algorithms that provide support for fitting with bounds and
constraints. Other algorithms will ignore bounds and constraints.
Attributes:
covariance (np.array): The covariance matrix of the fitted parameters.
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the covariance matrix
may not be valid.
detectors (list of str): The detectors used in the fit
dof (int): The number of degrees of freedom
energy_range (float, float): The full energy range of the fit
function_components (list of str): The names of the individual model
components, if appropriate
function_name (str): The name of the model function being fit
hessian (np.array):
The Hessian matrix of the fitted parameters, for which the
covariance matrix is -inverse(Hessian).
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the Hessian matrix may
not be valid.
jacobian (np.array): The Jacobian vector of the likelihood as a
function of the parameters.
message (str): The message from the fitter on convergence/non-convergence
num_components (int): Number of function components
num_sets (int): Number of datasets/detectors being fit
parameters (np.array): The parameter values at the maximum likelihood
statistic (float): The fit statistic (likelihood)
success (bool): True if the fit was successful, False otherwise.
symmetric_errors (np.array):
The 1-sigma uncertainty on the parameters assuming higher-order terms
of the Taylor expansion of the likelihood about the maximum
likelihood solution is negligible. If those terms are not negligible,
errors could be very incorrect or event NaN.
See attributes :attr:`covariance` and :attr:`hessian`.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, **kwargs):
super().__init__(pha_list, bkgd_list, rsp_list, chisq, **kwargs)
def _fold_model(self, function, params):
stat = np.zeros(self.num_sets)
for i in range(self.num_sets):
# fold model through response and convert to raw model counts
model = self._rsp[i].fold_spectrum(function, params,
channel_mask=self._chan_masks[
i])
# perform chisq calculation for one dataset
stat[i] = self._stat(self._data[i], self._back_rates[i],
self._back_var[i], model, self._exposure[i])
return stat.sum()
class SpectralFitterPgstat(SpectralFitter):
"""Jointly-fit datasets using Profile Gaussian likelihood (PGSTAT).
Parameters:
pha_list (list of :class:`~gbm.data.PHA`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gbm.background.BackgroundRates`):
The background rates object for each detector
rsp_list (list of :class:`~gbm.data.RSP`):
The response object for each detector
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`valid_channels` attribute in the PHA object. You can
set channel_masks to override the PHA :attr:`valid_channels`,
but this is usually unnecessary.
back_exp (list of float, optional): Set if different than the source
exposure
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two
algorithms that provide support for fitting with bounds and
constraints. Other algorithms will ignore bounds and constraints.
Attributes:
covariance (np.array): The covariance matrix of the fitted parameters.
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the covariance matrix
may not be valid.
detectors (list of str): The detectors used in the fit
dof (int): The number of degrees of freedom
energy_range (float, float): The full energy range of the fit
function_components (list of str): The names of the individual model
components, if appropriate
function_name (str): The name of the model function being fit
hessian (np.array):
The Hessian matrix of the fitted parameters, for which the
covariance matrix is -inverse(Hessian).
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the Hessian matrix may
not be valid.
jacobian (np.array): The Jacobian vector of the likelihood as a
function of the parameters.
message (str): The message from the fitter on convergence/non-convergence
num_components (int): Number of function components
num_sets (int): Number of datasets/detectors being fit
parameters (np.array): The parameter values at the maximum likelihood
statistic (float): The fit statistic (likelihood)
success (bool): True if the fit was successful, False otherwise.
symmetric_errors (np.array):
The 1-sigma uncertainty on the parameters assuming higher-order terms
of the Taylor expansion of the likelihood about the maximum
likelihood solution is negligible. If those terms are not negligible,
errors could be very incorrect or event NaN.
See attributes :attr:`covariance` and :attr:`hessian`.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, back_exp=None, **kwargs):
super().__init__(pha_list, bkgd_list, rsp_list, pgstat, **kwargs)
# background exposure (may == source exposure)
if back_exp is None:
back_exp = self._exposure
self._back_exp = back_exp
def _fold_model(self, function, params):
stat = np.zeros(self.num_sets)
for i in range(self.num_sets):
# fold model through response and convert to raw model counts
model = self._rsp[i].fold_spectrum(function, params,
channel_mask=self._chan_masks[i])
# perform pgstat calculation for one dataset
stat[i] = self._stat(self._data[i], model, self._exposure[i],
self._back_rates[i]*self._back_exp[i],
self._back_var[i],
self._back_exp[i])
return stat.sum()
class SpectralFitterCstat(SpectralFitter):
"""Jointly-fit datasets using C-Stat (Poisson source with Poisson background)
Parameters:
pha_list (list of :class:`~gbm.data.PHA`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gbm.background.BackgroundRates`):
The background rates object for each detector
rsp_list (list of :class:`~gbm.data.RSP`):
The response object for each detector
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`valid_channels` attribute in the PHA object. You can
set channel_masks to override the PHA :attr:`valid_channels`,
but this is usually unnecessary.
back_exp (list of float, optional): Set if different than the source
exposure
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two
algorithms that provide support for fitting with bounds and
constraints. Other algorithms will ignore bounds and constraints.
Attributes:
covariance (np.array): The covariance matrix of the fitted parameters.
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the covariance matrix
may not be valid.
detectors (list of str): The detectors used in the fit
dof (int): The number of degrees of freedom
energy_range (float, float): The full energy range of the fit
function_components (list of str): The names of the individual model
components, if appropriate
function_name (str): The name of the model function being fit
hessian (np.array):
The Hessian matrix of the fitted parameters, for which the
covariance matrix is -inverse(Hessian).
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the Hessian matrix may
not be valid.
jacobian (np.array): The Jacobian vector of the likelihood as a
function of the parameters.
message (str): The message from the fitter on convergence/non-convergence
num_components (int): Number of function components
num_sets (int): Number of datasets/detectors being fit
parameters (np.array): The parameter values at the maximum likelihood
statistic (float): The fit statistic (likelihood)
success (bool): True if the fit was successful, False otherwise.
symmetric_errors (np.array):
The 1-sigma uncertainty on the parameters assuming higher-order terms
of the Taylor expansion of the likelihood about the maximum
likelihood solution is negligible. If those terms are not negligible,
errors could be very incorrect or event NaN.
See attributes :attr:`covariance` and :attr:`hessian`.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, back_exp=None, **kwargs):
super().__init__(pha_list, bkgd_list, rsp_list, cstat, **kwargs)
# background exposure (may == source exposure)
if back_exp is None:
back_exp = self._exposure
self._back_exp = back_exp
def _fold_model(self, function, params):
stat = np.zeros(self.num_sets)
for i in range(self.num_sets):
# fold model through response and convert to raw model counts
model = self._rsp[i].fold_spectrum(function, params,
channel_mask=self._chan_masks[
i])
# perform cstat calculation for one dataset
stat[i] = self._stat(self._data[i], model, self._exposure[i],
self._back_rates[i]*self._back_exp[i],
self._back_exp[i])
return stat.sum()
class SpectralFitterPstat(SpectralFitter):
"""Jointly-fit datasets using P-Stat (Poisson source with known background).
This statistic assumes non-zero counts, therefore any channels with zero
counts will be masked out and not used in fitting.
Parameters:
pha_list (list of :class:`~gbm.data.PHA`):
The PHA objects containg the count spectrum for each detector
bkgd_list (list of :class:`~gbm.background.BackgroundRates`):
The background rates object for each detector
rsp_list (list of :class:`~gbm.data.RSP`):
The response object for each detector
channel_masks (list of np.array(dtype=bool), optional):
A boolean mask for each detector that indicates which energy
channels are to be fit.
Note:
The channel_mask will typically be set by the
:attr:`valid_channels` attribute in the PHA object. You can
set channel_masks to override the PHA :attr:`valid_channels`,
but this is usually unnecessary.
back_exp (list of float, optional): Set if different than the source
exposure
method (str, optional):
The fitting algorithm, which should be one of the options for
scipy.optimize.minimize. Note that 'TNC' and 'SLSQP' are the two
algorithms that provide support for fitting with bounds and
constraints. Other algorithms will ignore bounds and constraints.
Attributes:
covariance (np.array): The covariance matrix of the fitted parameters.
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the covariance matrix
may not be valid.
detectors (list of str): The detectors used in the fit
dof (int): The number of degrees of freedom
energy_range (float, float): The full energy range of the fit
function_components (list of str): The names of the individual model
components, if appropriate
function_name (str): The name of the model function being fit
hessian (np.array):
The Hessian matrix of the fitted parameters, for which the
covariance matrix is -inverse(Hessian).
Note:
The assumption of negligible higher-order terms in the Taylor
expansion may not be valid and therefore the Hessian matrix may
not be valid.
jacobian (np.array): The Jacobian vector of the likelihood as a
function of the parameters.
message (str): The message from the fitter on convergence/non-convergence
num_components (int): Number of function components
num_sets (int): Number of datasets/detectors being fit
parameters (np.array): The parameter values at the maximum likelihood
statistic (float): The fit statistic (likelihood)
success (bool): True if the fit was successful, False otherwise.
symmetric_errors (np.array):
The 1-sigma uncertainty on the parameters assuming higher-order terms
of the Taylor expansion of the likelihood about the maximum
likelihood solution is negligible. If those terms are not negligible,
errors could be very incorrect or event NaN.
See attributes :attr:`covariance` and :attr:`hessian`.
"""
def __init__(self, pha_list, bkgd_list, rsp_list, **kwargs):
super().__init__(pha_list, bkgd_list, rsp_list, pstat, **kwargs)
def _fold_model(self, function, params):
stat = np.zeros(self.num_sets)
for i in range(self.num_sets):
# fold model through response and convert to raw model counts
model = self._rsp[i].fold_spectrum(function, params,
channel_mask=self._chan_masks[
i])
# perform pstat calculation for one dataset
stat[i] = self._stat(self._data[i], model, self._exposure[i],
self._back_rates[i])
return stat.sum()
# --------------------------------------------------------------------------
# FIT STATISTICS
def chisq(obs_counts, back_rates, back_var, mod_rates, exposure):
"""Chi-Square Likelihood
Args:
obs_counts (np.array): The total observed counts
back_rates (np.array): The background model count rates
back_var (np.array): The background model count rate variance
mod_rates (np.array): The model source rates
exposure (float): The source exposure
Returns:
float: The chi-squared statistic
"""
mask = (obs_counts - back_rates * exposure) > 0
like = ((obs_counts[mask] - back_rates[mask] * exposure) - mod_rates[
mask] * exposure) ** 2 / \
(obs_counts[mask] + back_var[mask] * exposure ** 2)
return -like.sum()
def cstat(obs_counts, mod_rates, exposure, back_counts, back_exp):
"""C-Statistic Likelihood for Poisson source + Poisson background.
The "W" statistic from the `XSPEC Statistics Appendix
<https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_.
Args:
obs_counts (np.array): The total observed counts
mod_rates (np.array): The model source rates
exposure (float): The source exposure
back_counts (np.array): The background model counts
back_exp (float): The background exposure
Returns:
float: The C-statistic
"""
sum_exp = exposure + back_exp
# special cases for when the observed counts = 0 or when the background
# model counts = 0
mask1 = (obs_counts == 0)
mask2 = (back_counts == 0) & (mod_rates < obs_counts/(sum_exp))
mask3 = (back_counts == 0) & (mod_rates >= obs_counts/(sum_exp))
w = np.zeros_like(obs_counts)
w[mask1] = exposure*mod_rates[mask1] - back_counts[mask1]*np.log(back_exp/sum_exp)
w[mask2] = -back_exp*mod_rates[mask2] - obs_counts[mask2]*np.log(exposure/sum_exp)
w[mask3] = exposure*mod_rates[mask3] + obs_counts[mask3]* \
(np.log(obs_counts[mask3]) - np.log(exposure*mod_rates[mask3])-1.0)
# for all other cases:
mask = ~(mask1 | mask2 | mask3)
mod_rates_nz = mod_rates[mask]
obs_counts_nz = obs_counts[mask]
back_counts_nz = back_counts[mask]
d = np.sqrt((sum_exp*mod_rates_nz - obs_counts_nz - back_counts_nz)**2 + \
4.0*sum_exp*back_counts_nz*mod_rates_nz)
f = (obs_counts_nz + back_counts_nz - sum_exp*mod_rates_nz + d) / (2.0*sum_exp)
w[mask] = exposure*mod_rates_nz + sum_exp*f -\
obs_counts_nz*np.log(exposure*mod_rates_nz + exposure*f) - \
back_counts_nz*np.log(back_exp*f) - \
obs_counts_nz*(1.0-np.log(obs_counts_nz)) - \
back_counts_nz*(1.0-np.log(back_counts_nz))
return -w.sum()
def pstat(obs_counts, mod_rates, exposure, back_rates):
"""Likelihood for Poisson source + known background.
The "pstat" statistic from the `XSPEC Statistics Appendix
<https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_.
Note:
Elements with zero counts are masked out and not figured in the
statistic.
Args:
obs_counts (np.array): The total observed counts
mod_rates (np.array): The model source rates
exposure (float): The source exposure
back_rates (np.array): The background model count rates
Returns:
float: The C-statistic
"""
mask = (obs_counts > 0) & (mod_rates + back_rates > 0.0)
pstat = exposure * (mod_rates[mask] + back_rates[mask]) - \
obs_counts[mask]*np.log(exposure*(mod_rates[mask] + \
back_rates[mask])) - obs_counts[mask] * \
(1.0-np.log(obs_counts[mask]))
return -pstat.sum()
def pgstat(obs_counts, mod_rates, exposure, back_counts, back_var, back_exp):
"""Profile Gaussian Likelihood. From the `XSPEC Statistics Appendix
<https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html>`_.
Args:
obs_counts (np.array): The total observed counts
mod_rates (np.array): The model source rates
exposure (float): The source exposure
back_counts (np.array): The background model counts
back_var (np.array): The background model count variance
back_exp (float): The background exposure
Returns:
float: The PG-stat
"""
mask = (obs_counts > 0)
pg = np.zeros_like(obs_counts)
# special case for zero observed counts
pg[~mask] = exposure * mod_rates[~mask] + back_counts[~mask] * (
exposure / back_exp) \
- np.sqrt(back_var[~mask]) * (exposure / back_exp) ** 2 / 2.0
# for all other cases:
obs_counts_nz = obs_counts[mask]
mod_rates_nz = mod_rates[mask]
back_counts_nz = back_counts[mask]
back_var_nz = back_var[mask]
d = np.sqrt((exposure * back_var_nz - back_exp * back_counts_nz + \
back_exp ** 2 * mod_rates_nz) ** 2 - \
4.0 * back_exp ** 2 * (
exposure * back_var_nz * mod_rates_nz - \
obs_counts_nz * back_var_nz - \
back_exp * back_counts_nz * mod_rates_nz))
f = (-(exposure * back_var_nz - back_exp * back_counts_nz + \
back_exp ** 2 * mod_rates_nz) + d) / (2.0 * back_exp ** 2)
pg[mask] = back_exp * (mod_rates_nz + f) - \
obs_counts_nz * np.log(
exposure * mod_rates_nz + exposure * f) + \
(back_counts_nz - back_exp * f) ** 2 / (
2.0 * back_var_nz) - \
obs_counts_nz * (1.0 - np.log(obs_counts_nz))
return -pg.sum()