GBM-data-tools/data/pha.py

659 lines
26 KiB
Python

# pha.py: PHA and BAK classes
#
# Authors: William Cleveland (USRA),
# Adam Goldstein (USRA) and
# Daniel Kocevski (NASA)
#
# Portions of the code are Copyright 2020 William Cleveland and
# Adam Goldstein, Universities Space Research Association
# All rights reserved.
#
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import os
import astropy.io.fits as fits
import numpy as np
from collections import OrderedDict
from .data import DataFile
from . import headers as hdr
from .primitives import EnergyBins
class PHA(DataFile):
"""PHA class for count spectra.
Attributes:
data (:class:`~.primitives.EnergyBins`): The PHA data
datatype (str): The datatype of the file
detector (str): The GBM detector the file is associated with
directory (str): The directory the file is located in
energy_range (float, float): The energy range of the spectrum
exposure (float): The exposure of the PHA data
filename (str): The filename
full_path (str): The full path+filename
gti ([(float, float), ...]): The good time intervals
headers (dict): The headers for each extension of the PHA
id (str): The GBM file ID
is_gbm_file (bool): True if the file is a valid GBM standard file,
False if it is not.
is_trigger (bool): True if the file is a GBM trigger file, False if not
numchans (int): The number of energy channels
tcent (float): The center time of the data
time_range (float, float): The time range of the spectrum
trigtime (float): The trigger time of the data, if available
valid_channels (np.array(dtype=bool)): The channels that are marked
valid and available for fitting
"""
def __init__(self):
super(PHA, self).__init__()
self._headers = OrderedDict()
self._data = None
self._gti = None
self._trigtime = 0.0
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def _assert_range_list(self, range_list):
try:
iter(range_list[0])
except:
range_list = [range_list]
return range_list
@property
def data(self):
return self._data
@property
def gti(self):
return self._gti.tolist()
@property
def headers(self):
return self._headers
@property
def trigtime(self):
return self._trigtime
@property
def time_range(self):
return (self._gti['START'][0], self._gti['STOP'][-1])
@property
def tcent(self):
return sum(self.time_range) / 2.0
@property
def energy_range(self):
return self._data.range
@property
def numchans(self):
return self._data.size
@property
def valid_channels(self):
return np.arange(self.numchans, dtype=int)[self._channel_mask]
@property
def exposure(self):
return self._data.exposure[0]
@classmethod
def open(cls, filename):
"""Open a PHA FITS file and return the PHA object
Args:
filename (str): The filename of the FITS file
Returns:
:class:`PHA`: The PHA object
"""
obj = cls()
obj._file_properties(filename)
# open FITS file
with fits.open(filename) as hdulist:
# store headers
for hdu in hdulist:
obj._headers.update({hdu.name: hdu.header})
# trigger time
if 'TRIGTIME' in list(obj._headers['PRIMARY'].keys()):
if obj._headers['PRIMARY']['TRIGTIME'] != '':
obj._trigtime = float(obj._headers['PRIMARY']['TRIGTIME'])
else:
obj._headers['PRIMARY']['TRIGTIME'] = 0.0
else:
obj._headers['PRIMARY']['TRIGTIME'] = 0.0
# if trigger, shift to trigger time reference
spec = hdulist['SPECTRUM'].data
ebounds = hdulist['EBOUNDS'].data
gti = np.asarray(hdulist['GTI'].data)
if obj._trigtime is not None:
gti['START'] -= obj._trigtime
gti['STOP'] -= obj._trigtime
# create the energy histogram, the core of the PHA
exposure = np.full(ebounds.shape[0],
obj._headers['SPECTRUM']['EXPOSURE'])
obj._data = EnergyBins(spec['COUNTS'], ebounds['E_MIN'],
ebounds['E_MAX'], exposure)
obj._gti = gti
obj._set_channel_mask(spec['QUALITY'])
return obj
@classmethod
def from_data(cls, data, tstart, tstop, gti=None, trigtime=0.0,
detector=None, object=None,
ra_obj=None, dec_obj=None, err_rad=None, datatype=None,
backfile=None, rspfile=None, meta=None, channel_mask=None):
"""Create a PHA object from an EnergyBins data object.
Args:
data (:class:`~.primitives.EnergyBins`): The PHA count spectrum data
tstart (float): The start time of the data
tstop (float): The end time of the data
gti ([(float, float), ...], optional):
The list of tuples representing the good time intervals
(start, stop). If omitted, the GTI is assumed to be
[(tstart, tstop)].
trigtime (float, optional):
The trigger time, if applicable. If provided, the data times
will be shifted relative to the trigger time. Default is zero.
detector (str, optional): The detector that produced the data
object (str, optional): The object being observed
ra_obj (float, optional): The RA of the object
dec_obj (float, optional): The Dec of the object
err_rad (float, optional): The localization error radius of the object
datatype (str, optional): The datatype from which the PHA is created
backfile (str, optional): The associated background file
rspfile (str, optional): The associated response file
meta (str, optional): Additional metadata string to be added to
standard filename
channel_mask (np.array(dtype=bool)):
A boolean array representing the valid channels to be used for
fitting. If omitted, assumes all non-zero count channels are valid.
Returns:
:class:`PHA`: The PHA object
"""
obj = cls()
filetype = 'SPECTRUM'
obj._data = data
detchans = data.size
try:
trigtime = float(trigtime)
except:
raise TypeError('trigtime must be a float')
if trigtime < 0.0:
raise ValueError('trigtime must be non-negative')
obj._trigtime = trigtime
tstart += trigtime
tstop += trigtime
# create the primary extension
primary_header = hdr.primary(detnam=detector, filetype=filetype,
tstart=tstart,
tstop=tstop, trigtime=trigtime,
object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad)
headers = [primary_header]
header_names = ['PRIMARY']
# ebounds extension
ebounds_header = hdr.ebounds(detnam=detector, tstart=tstart,
tstop=tstop,
trigtime=trigtime, object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad, detchans=detchans)
headers.append(ebounds_header)
header_names.append('EBOUNDS')
# spectrum extension
spectrum_header = hdr.pha_spectrum(exposure=data.exposure[0],
detnam=detector, tstart=tstart,
tstop=tstop,
trigtime=trigtime, object=object,
ra_obj=ra_obj,
dec_obj=dec_obj, err_rad=err_rad,
detchans=detchans,
backfile=backfile, rspfile=rspfile)
headers.append(spectrum_header)
header_names.append('SPECTRUM')
# gti extension
if gti is None:
gti = [(tstart, tstop)]
gti = np.array(gti, dtype=[('START', '>f8'), ('STOP', '>f8')])
gti['START'] += trigtime
gti['STOP'] += trigtime
gti_header = hdr.gti(detnam=detector, tstart=tstart, tstop=tstop,
trigtime=trigtime, object=object, ra_obj=ra_obj,
dec_obj=dec_obj, err_rad=err_rad)
headers.append(gti_header)
header_names.append('GTI')
obj._gti = gti
# set the channel mask
# if no channel mask is given, assume zero-count channels are bad
if channel_mask is None:
channel_mask = np.zeros(data.size, dtype=bool)
channel_mask[data.counts > 0] = True
obj._channel_mask = channel_mask
# store headers and set data properties
obj._headers = {name: header for name, header in
zip(header_names, headers)}
# set file info
obj.set_properties(detector=detector, trigtime=trigtime,
tstart=obj.time_range[0], datatype=datatype,
extension='pha', meta=meta)
gti['START'] -= obj.trigtime
gti['STOP'] -= obj.trigtime
return obj
def write(self, directory, filename=None, backfile=None):
"""Writes the data to a FITS file.
Args:
directory (str): The directory to write the file
filename (str, optional): If set, will override the standardized name
backfile (str, optional): If set, will add the associated BAK file
name to the PHA SPECTRUM header
"""
# set filename
if (self.filename is None) and (filename is None):
raise NameError('Filename not set')
if filename is None:
filename = self.filename
self._full_path = os.path.join(directory, filename)
self._headers['PRIMARY']['FILENAME'] = filename
if backfile is not None:
self._headers['SPECTRUM']['BACKFILE'] = backfile
# make ebounds table
chan_idx = np.arange(self.numchans, dtype=int)
ebounds = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'),
('E_MIN', '>f4'),
('E_MAX', '>f4')])
ebounds['CHANNEL'] = chan_idx
ebounds['E_MIN'] = self._data.lo_edges
ebounds['E_MAX'] = self._data.hi_edges
# make spectrum table
trigtime = self.trigtime
spec = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'),
('COUNTS', '>i4'),
('QUALITY', '>i2')])
spec['CHANNEL'] = chan_idx
spec['COUNTS'] = self._data.counts
spec['QUALITY'] = (~self._channel_mask).astype(int)
# create FITS
hdulist = fits.HDUList()
primary_hdu = fits.PrimaryHDU(header=self._headers['PRIMARY'])
hdulist.append(primary_hdu)
ebounds_hdu = fits.BinTableHDU(data=ebounds, name='EBOUNDS',
header=self._headers['EBOUNDS'])
hdulist.append(ebounds_hdu)
spectrum_hdu = fits.BinTableHDU(data=spec, name='SPECTRUM',
header=self._headers['SPECTRUM'])
hdulist.append(spectrum_hdu)
gti = np.copy(self._gti)
gti['START'] += trigtime
gti['STOP'] += trigtime
gti_hdu = fits.BinTableHDU(data=gti, header=self._headers['GTI'],
name='GTI')
hdulist.append(gti_hdu)
hdulist.writeto(self.full_path, checksum=True)
def slice_energy(self, energy_ranges):
"""Slice the PHA by one or more energy range. Produces a new PHA object.
Args:
energy_ranges ([(float, float), ...]): The energy ranges to slice over
Returns:
:class:`PHA`: A new PHA object with the requested energy range
"""
energy_ranges = self._assert_range_list(energy_ranges)
data = [self.data.slice(*self._assert_range(energy_range)) \
for energy_range in energy_ranges]
data = EnergyBins.merge(data)
tstart = self.headers['PRIMARY']['TSTART'] - self.trigtime
tstop = self.headers['PRIMARY']['TSTOP'] - self.trigtime
obj = self.headers['PRIMARY']['OBJECT']
ra_obj = self.headers['PRIMARY']['RA_OBJ']
dec_obj = self.headers['PRIMARY']['DEC_OBJ']
err_rad = self.headers['PRIMARY']['ERR_RAD']
backfile = self.headers['SPECTRUM']['BACKFIL']
rspfile = self.headers['SPECTRUM']['RESPFILE']
meta = self._filename_obj.meta
pha = self.from_data(data, tstart, tstop, gti=self.gti,
trigtime=self.trigtime, detector=self.detector,
object=obj, ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad, datatype=self.datatype,
backfile=backfile, rspfile=rspfile, meta=meta)
return pha
def rebin_energy(self, method, *args, emin=None, emax=None):
"""Rebin the PHA in energy given a rebinning method.
Produces a new PHA object.
Args:
method (<function>): The rebinning function
*args: Arguments to be passed to the rebinning function
emin (float, optional): The minimum energy to rebin. If omitted,
uses the lowest energy edge of the data
emax (float, optional): The maximum energy to rebin. If omitted,
uses the highest energy edge of the data
Returns:
:class:`PHA`: A new PHA object with the requested rebinned data
"""
data = self.data.rebin(method, *args, emin=None, emax=None)
tstart = self.headers['PRIMARY']['TSTART'] - self.trigtime
tstop = self.headers['PRIMARY']['TSTOP'] - self.trigtime
obj = self.headers['PRIMARY']['OBJECT']
ra_obj = self.headers['PRIMARY']['RA_OBJ']
dec_obj = self.headers['PRIMARY']['DEC_OBJ']
err_rad = self.headers['PRIMARY']['ERR_RAD']
backfile = self.headers['SPECTRUM']['BACKFIL']
rspfile = self.headers['SPECTRUM']['RESPFILE']
meta = self._filename_obj.meta
pha = self.from_data(data, tstart, tstop, gti=self.gti,
trigtime=self.trigtime, detector=self.detector,
object=obj, ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad, datatype=self.datatype,
backfile=backfile, rspfile=rspfile, meta=meta)
return pha
def _set_channel_mask(self, quality_flags):
self._channel_mask = np.zeros_like(quality_flags, dtype=bool)
mask = (quality_flags == 0)
self._channel_mask[mask] = True
class BAK(PHA):
"""Class for a PHA background spectrum.
Attributes:
data (:class:`~.primitives.EnergyBins`): The PHA data
datatype (str): The datatype of the file
detector (str): The GBM detector the file is associated with
directory (str): The directory the file is located in
energy_range (float, float): The energy range of the spectrum
exposure (float): The exposure of the PHA data
filename (str): The filename
full_path (str): The full path+filename
gti ([(float, float), ...]): The good time intervals
headers (dict): The headers for each extension of the PHA
id (str): The GBM file ID
is_gbm_file (bool): True if the file is a valid GBM standard file,
False if it is not.
is_trigger (bool): True if the file is a GBM trigger file, False if not
numchans (int): The number of energy channels
tcent (float): The center time of the data
time_range (float, float): The time range of the spectrum
trigtime (float): The trigger time of the data, if available
"""
def __init__(self):
PHA.__init__(self)
self._channel_mask = None
@classmethod
def open(cls, filename):
"""Open a BAK FITS file and return the BAK object
Args:
filename (str): The filename of the FITS file
Returns:
:class:`BAK`: The BAK object
"""
from ..background import BackgroundSpectrum
obj = cls()
obj._file_properties(filename)
# open FITS file
with fits.open(filename) as hdulist:
# store headers
for hdu in hdulist:
obj._headers.update({hdu.name: hdu.header})
# trigger time
if 'TRIGTIME' in list(obj._headers['PRIMARY'].keys()):
if obj._headers['PRIMARY']['TRIGTIME'] != '':
obj._trigtime = float(obj._headers['PRIMARY']['TRIGTIME'])
else:
obj._headers['PRIMARY']['TRIGTIME'] = 0.0
else:
obj._headers['PRIMARY']['TRIGTIME'] = 0.0
spec = hdulist['SPECTRUM'].data
ebounds = hdulist['EBOUNDS'].data
gti = np.asarray(hdulist['GTI'].data)
if obj._trigtime is not None:
gti['START'] -= obj._trigtime
gti['STOP'] -= obj._trigtime
# create the background spectrum, the core of the BAK
exposure = np.full(ebounds.shape[0],
obj._headers['SPECTRUM']['EXPOSURE'])
obj._data = BackgroundSpectrum(spec['RATE'], spec['STAT_ERR'],
ebounds['E_MIN'], ebounds['E_MAX'],
exposure)
obj._gti = gti
return obj
@classmethod
def from_data(cls, data, tstart, tstop, gti=None, trigtime=0.0,
detector=None,
object=None, ra_obj=None, dec_obj=None, err_rad=None,
datatype=None, meta=None):
"""Create a BAK object from a :class:`~gbm.background.BackgroundSpectrum`
data object.
Args:
data (:class:`~gbm.background.BackgroundSpectrum`):
The background count rate spectrum
tstart (float): The start time of the data
tstop (float): The end time of the data
gti ([(float, float), ...], optional):
The list of tuples representing the good time intervals
(start, stop). If omitted, the GTI is assumed to be
[(tstart, tstop)].
trigtime (float, optional):
The trigger time, if applicable. If provided, the data times
will be shifted relative to the trigger time. Default is zero.
detector (str, optional): The detector that produced the data
object (str, optional): The object being observed
ra_obj (float, optional): The RA of the object
dec_obj (float, optional): The Dec of the object
err_rad (float, optional): The localization error radius of the object
datatype (str, optional): The datatype from which the PHA is created
meta (str, optional): Additional metadata string to be added to
standard filename
Returns:
:class:`BAK`: The BAK object
"""
obj = cls()
filetype = 'BACKGROUND'
obj._data = data
detchans = data.size
try:
trigtime = float(trigtime)
except:
raise TypeError('trigtime must be a float')
if trigtime < 0.0:
raise ValueError('trigtime must be non-negative')
obj._trigtime = trigtime
tstart += trigtime
tstop += trigtime
# create the primary extension
primary_header = hdr.primary(detnam=detector, filetype=filetype,
tstart=tstart,
tstop=tstop, trigtime=trigtime,
object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad)
headers = [primary_header]
header_names = ['PRIMARY']
# ebounds extension
ebounds_header = hdr.ebounds(detnam=detector, tstart=tstart,
tstop=tstop,
trigtime=trigtime, object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad, detchans=detchans)
headers.append(ebounds_header)
header_names.append('EBOUNDS')
# spectrum extension
spectrum_header = hdr.pha_spectrum(exposure=data.exposure[0],
detnam=detector, tstart=tstart,
tstop=tstop,
trigtime=trigtime, object=object,
ra_obj=ra_obj,
dec_obj=dec_obj, err_rad=err_rad,
detchans=detchans,
poisserr=False)
spectrum_header['HDUCLAS2'] = ('BKG', 'Background PHA Spectrum')
spectrum_header['HDUCLAS3'] = ('RATE', 'PHA data stored as rates')
spectrum_header['TUNIT2'] = 'count/s'
spectrum_header['TUNIT3'] = 'count/s'
headers.append(spectrum_header)
header_names.append('SPECTRUM')
# gti extension
if gti is None:
gti = [(tstart, tstop)]
gti = np.array(gti, dtype=[('START', '>f8'), ('STOP', '>f8')])
gti['START'] += trigtime
gti['STOP'] += trigtime
gti_header = hdr.gti(detnam=detector, tstart=tstart, tstop=tstop,
trigtime=trigtime, object=object, ra_obj=ra_obj,
dec_obj=dec_obj, err_rad=err_rad)
headers.append(gti_header)
header_names.append('GTI')
obj._gti = gti
# store headers and set data properties
obj._headers = {name: header for name, header in
zip(header_names, headers)}
# set file info
obj.set_properties(detector=detector, trigtime=trigtime,
tstart=obj.time_range[0], datatype=datatype,
extension='bak', meta=meta)
gti['START'] -= obj.trigtime
gti['STOP'] -= obj.trigtime
return obj
def write(self, directory, filename=None):
"""Writes the data to a FITS file.
Args:
directory (str): The directory to write the file
filename (str, optional): If set, will override the standardized name
"""
# set filename
if (self.filename is None) and (filename is None):
raise NameError('Filename not set')
if filename is None:
filename = self.filename
self._full_path = os.path.join(directory, filename)
self._headers['PRIMARY']['FILENAME'] = filename
# make ebounds table
chan_idx = np.arange(self.numchans, dtype=int)
ebounds = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'),
('E_MIN', '>f4'),
('E_MAX', '>f4')])
ebounds['CHANNEL'] = chan_idx
ebounds['E_MIN'] = self._data.lo_edges
ebounds['E_MAX'] = self._data.hi_edges
# make spectrum table
trigtime = self.trigtime
spec = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'),
('RATE', '>f8'),
('STAT_ERR', '>f8')])
spec['CHANNEL'] = chan_idx
spec['RATE'] = self._data.rates
spec['STAT_ERR'] = self._data.rate_uncertainty
# create FITS
hdulist = fits.HDUList()
primary_hdu = fits.PrimaryHDU(header=self._headers['PRIMARY'])
hdulist.append(primary_hdu)
ebounds_hdu = fits.BinTableHDU(data=ebounds, name='EBOUNDS',
header=self._headers['EBOUNDS'])
hdulist.append(ebounds_hdu)
spectrum_hdu = fits.BinTableHDU(data=spec, name='SPECTRUM',
header=self._headers['SPECTRUM'])
hdulist.append(spectrum_hdu)
gti = np.copy(self._gti)
gti['START'] += trigtime
gti['STOP'] += trigtime
gti_hdu = fits.BinTableHDU(data=gti, header=self._headers['GTI'],
name='GTI')
hdulist.append(gti_hdu)
hdulist.writeto(self.full_path, checksum=True)
def rebin_energy(self, method, *args, emin=None, emax=None):
"""Not Implemented"""
raise NotImplementedError('Function not available for BAK objects')
def slice_energy(self, method, *args, emin=None, emax=None):
"""Not Implemented"""
raise NotImplementedError('Function not available for BAK objects')