GBM-data-tools/data/phaii.py

1373 lines
55 KiB
Python

# phaii.py: PHAII and TTE classes
#
# Authors: William Cleveland (USRA),
# Adam Goldstein (USRA) and
# Daniel Kocevski (NASA)
#
# Portions of the code are Copyright 2020 William Cleveland and
# Adam Goldstein, Universities Space Research Association
# All rights reserved.
#
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import os
import astropy.io.fits as fits
import numpy as np
from collections import OrderedDict
import warnings
from astropy.utils.exceptions import AstropyUserWarning
warnings.filterwarnings("ignore", category=AstropyUserWarning)
from .data import DataFile
from .pha import PHA
from . import headers as hdr
from .primitives import TimeEnergyBins, EventList, EnergyBins, GTI
def time_slice_gti(data, gti):
""" Helper function to create new gti when applying time slices
Args:
data (EventList or TimeEnergyBins): Either a list of events or
binned data
gti : list
List of good time intervals
Returns:
list: List of good time intervals that also obey time slices
"""
slice_gti = []
for d in data:
for start, stop in gti:
try:
mask = (start <= d.time) & (d.time <= stop)
if mask.sum():
slice_gti.append([d.time[mask].min(), d.time[mask].max()])
except:
mask = (start <= d.tstop) & (d.tstart <= stop)
if mask.sum():
slice_gti.append([d.tstart[mask].min(), d.tstop[mask].max()])
if len(slice_gti):
return slice_gti
return gti
class PHAII:
"""PHAII class for time history and spectra.
Attributes:
data (:class:`~.primitives.TimeEnergyBins`): The PHAII data
gti ([(float, float), ...]): A list of tuples defining the good time intervals
headers (dict): The headers for each extension of the PHAII
trigtime (float): The trigger time of the data, if available.
time_range (float, float): The time range of the data
energy_range (float, float): The energy range of the data
numchans (int): The number of energy channels
"""
def __init__(self):
super(PHAII, self).__init__()
self._headers = OrderedDict()
self._data = None
self._gti = None
self._trigtime = 0.0
def _assert_exposure(self, spec):
"""Sometimes the data files contain a negative exposure. That is very
very naughty, and we don't like dealing with naughty things.
"""
mask = spec['EXPOSURE'] < 0.0
spec['EXPOSURE'][mask] = 0.0
return spec
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def _assert_range_list(self, range_list):
try:
iter(range_list[0])
except:
range_list = [tuple(range_list)]
range_list = [self._assert_range(r) for r in range_list]
return range_list
@property
def data(self):
return self._data
@property
def gti(self):
return self._assert_range_list(self._gti.tolist())
@property
def headers(self):
return self._headers
@property
def trigtime(self):
return self._trigtime
@trigtime.setter
def trigtime(self, val):
try:
float_val = float(val)
except:
raise TypeError('trigtime must be a float')
if float_val < 0.0:
raise ValueError('trigtime must be non-negative')
self._trigtime = float_val
self._headers['PRIMARY']['TRIGTIME'] = float_val
@property
def time_range(self):
return self._data.time_range
@property
def energy_range(self):
return self._data.energy_range
@property
def numchans(self):
return self._data.numchans
@classmethod
def open(cls, filename):
"""Open a PHAII FITS file and return the PHAII object
Args:
filename (str): The filename of the FITS file
Returns:
:class:`PHAII`: The PHAII object
"""
obj = cls()
obj._file_properties(filename)
# open FITS file
with fits.open(filename, mmap=False) as hdulist:
# store headers
for hdu in hdulist:
obj._headers.update({hdu.name: hdu.header})
# trigger time
if 'TRIGTIME' in list(obj._headers['PRIMARY'].keys()):
if obj._headers['PRIMARY']['TRIGTIME'] != '':
obj._trigtime = float(obj._headers['PRIMARY']['TRIGTIME'])
else:
obj._headers['PRIMARY']['TRIGTIME'] = 0.0
else:
obj._headers['PRIMARY']['TRIGTIME'] = 0.0
ebounds = hdulist['EBOUNDS'].data
spec = hdulist['SPECTRUM'].data
spec = obj._assert_exposure(spec)
# convert to recarray because there are many situations where we
# don't want TZERO applied, and it gets quite messy to keep track
# of where we do or don't want it applied...
# Unfortunately, recarray doesn't treat the unsigned integers
# correctly, so they are converted to signed integers. We have to
# correct this as well.
counts = spec['COUNTS'].astype(np.uint16)
spec = spec.view(np.recarray)
spec = spec.astype([('COUNTS', np.uint16, (counts.shape[1],)),
('EXPOSURE', '>f4'), ('QUALITY', '>i2'),
('TIME', '>f8'), ('ENDTIME', '>f8')])
spec['COUNTS'] = counts
# Add TZERO to event times if not trigger
if obj._trigtime == 0:
spec['TIME'] += obj._headers['SPECTRUM']['TZERO4']
spec['ENDTIME'] += obj._headers['SPECTRUM']['TZERO5']
# Do this for GTI as well
gti = hdulist['GTI'].data
gti = np.vstack((gti['START'], gti['STOP'])).squeeze().T
if obj._trigtime > 0.0:
gti -= obj._trigtime
# create the time-energy histogram, the core of the PHAII
obj._data = TimeEnergyBins(spec['COUNTS'], spec['TIME'],
spec['ENDTIME'], spec['EXPOSURE'],
ebounds['E_MIN'], ebounds['E_MAX'])
obj._gti = gti
return obj
@classmethod
def from_data(cls, data, gti=None, trigtime=0.0, detector=None,
object=None,
ra_obj=None, dec_obj=None, err_rad=None):
"""Create a PHAII object from TimeEnergyBins data object.
Args:
data (:class:`~gbm.data.primitives.TimeEnergyBins`): The PHAII data
gti ([(float, float), ...], optional):
The list of tuples representing the good time intervals
(start, stop). If omitted, the GTI is assumed to be
[(tstart, tstop)].
trigtime (float, optional):
The trigger time, if applicable. If provided, the data times
will be shifted relative to the trigger time.
detector (str, optional): The detector that produced the data
object (str, optional): The object being observed
ra_obj (float, optional): The RA of the object
dec_obj (float, optional): The Dec of the object
err_rad (float, optional): The localization error radius of the object
Returns:
:class:`.PHAII`: The newly created PHAII object
"""
obj = cls()
filetype = 'PHAII'
obj._data = data
detchans = data.numchans
tstart, tstop = data.time_range
try:
trigtime = float(trigtime)
except:
raise TypeError('trigtime must be a float')
if trigtime < 0.0:
raise ValueError('trigtime must be non-negative')
obj._trigtime = trigtime
tstart += trigtime
tstop += trigtime
detector = None if detector == '' else detector
object = None if object == '' else object
ra_obj = None if ra_obj == '' else ra_obj
dec_obj = None if dec_obj == '' else dec_obj
err_rad = None if err_rad == '' else err_rad
# create the primary extension
primary_header = hdr.primary(detnam=detector, filetype=filetype,
tstart=tstart,
tstop=tstop, trigtime=trigtime,
object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad)
headers = [primary_header]
header_names = ['PRIMARY']
# ebounds extension
ebounds_header = hdr.ebounds(detnam=detector, tstart=tstart,
tstop=tstop,
trigtime=trigtime, object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad, detchans=detchans)
headers.append(ebounds_header)
header_names.append('EBOUNDS')
# spectrum extension
spectrum_header = hdr.spectrum(detnam=detector, tstart=tstart,
tstop=tstop,
trigtime=trigtime, object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad, detchans=detchans)
headers.append(spectrum_header)
header_names.append('SPECTRUM')
# gti extension
if gti is None:
gti = [data.time_range]
ngti = len(gti)
gti_rec = np.recarray(ngti, dtype=[('START', '>f8'), ('STOP', '>f8')])
gti_rec['START'] = [one_gti[0] for one_gti in gti]
gti_rec['STOP'] = [one_gti[1] for one_gti in gti]
gti_header = hdr.gti(detnam=detector, tstart=tstart, tstop=tstop,
trigtime=trigtime, object=object, ra_obj=ra_obj,
dec_obj=dec_obj, err_rad=err_rad)
headers.append(gti_header)
header_names.append('GTI')
obj._gti = gti_rec
# store headers and set data properties
obj._headers = {name: header for name, header in
zip(header_names, headers)}
return obj
def get_exposure(self, time_ranges=None):
"""Calculate the total exposure of a time range or time ranges of data
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data.
Returns:
float: The exposure of the time selections
"""
if time_ranges is None:
time_ranges = self.time_range
time_ranges = self._assert_range_list(time_ranges)
exposure = self._data.get_exposure(time_ranges=time_ranges)
return exposure
def to_lightcurve(self, time_range=None, energy_range=None,
channel_range=None):
"""Integrate the PHAII data over energy to produce a lightcurve
Args:
time_range ((float, float), optional):
The time range of the lightcurve. If omitted, uses the entire
time range of the data.
energy_range ((float, float), optional):
The energy range of the lightcurve. If omitted, uses the entire
energy range of the data.
channel_range ((int, int), optional):
The channel range of the lightcurve. If omitted, uses the entire
energy range of the data.
Returns:
:class:`~gbm.data.primitives.TimeBins`: The lightcurve data
"""
# slice to desired time range
if time_range is not None:
temp = self._data.slice_time(*self._assert_range(time_range))
else:
temp = self._data
# limit integration to be over desired energy or channel range
if channel_range is not None:
self._assert_range(channel_range)
energy_range = (self._data.emin[channel_range[0]],
self._data.emax[channel_range[1]])
if energy_range is not None:
emin, emax = self._assert_range(energy_range)
else:
emin, emax = None, None
# produce the TimeBins lightcurve object
lc = temp.integrate_energy(emin=emin, emax=emax)
return lc
def to_spectrum(self, time_range=None, energy_range=None,
channel_range=None):
"""Integrate the PHAII data over time to produce a count spectrum
Args:
time_range ((float, float), optional):
The time range of the spectrum. If omitted, uses the entire
time range of the data.
energy_range ((float, float), optional):
The energy range of the spectrum. If omitted, uses the entire
energy range of the data.
channel_range ((int, int), optional):
The channel range of the spectrum. If omitted, uses the entire
energy range of the data.
Returns:
:class:`~gbm.data.primitives.EnergyBins`: The count spectrum data
"""
# slice to desired energy or channel range
if (channel_range is not None) or (energy_range is not None):
if channel_range is not None:
self._assert_range(channel_range)
energy_range = (self._data.emin[channel_range[0]],
self._data.emax[channel_range[1]])
temp = self._data.slice_energy(*self._assert_range(energy_range))
else:
temp = self._data
# limit integration to be over desired time range
if time_range is not None:
tstart, tstop = self._assert_range(time_range)
else:
tstart, tstop = None, None
# produce the EnergyBins count spectrum
spec = temp.integrate_time(tstart=tstart, tstop=tstop)
return spec
def write(self, directory, filename=None):
"""Writes the data to a FITS file.
Args:
directory (str): The directory to write the file
filename (str, optional): If set, will override the standardized name
"""
# set filename
if (self.filename is None) and (filename is None):
raise NameError('Filename not set')
if filename is None:
filename = self.filename
self._full_path = os.path.join(directory, filename)
self._headers['PRIMARY']['FILENAME'] = filename
# make ebounds table
chan_idx = np.arange(self.numchans, dtype=int)
ebounds = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'),
('E_MIN', '>f4'),
('E_MAX', '>f4')])
ebounds['CHANNEL'] = chan_idx
ebounds['E_MIN'] = self._data.emin
ebounds['E_MAX'] = self._data.emax
# make spectrum table
trigtime = self.trigtime
spec = np.recarray(self._data.numtimes,
dtype=[('COUNTS', '>i2', (self.numchans,)),
('EXPOSURE', '>f4'), ('QUALITY', '>i2'),
('TIME', '>f8'), ('ENDTIME', '>f8')])
spec['COUNTS'] = self._data.counts
spec['EXPOSURE'] = self._data.exposure
spec['QUALITY'] = np.full(self._data.numtimes, 1)
spec['TIME'] = self._data.tstart
spec['ENDTIME'] = self._data.tstop
# make the GTI table
ngti = len(self.gti)
gti = np.recarray(ngti, dtype=[('START', '>f8'), ('STOP', '>f8')])
gti['START'] = np.array(self.gti)[:, 0]
gti['STOP'] = np.array(self.gti)[:, 1]
# TZERO wreaks havoc, so we have to adjust the values based on
# if TZERO=TRIGTIME or not
if self.trigtime == 0.0:
spec['TIME'] -= self._headers['SPECTRUM']['TSTART']
spec['ENDTIME'] -= self._headers['SPECTRUM']['TSTART']
gti['START'] -= self._headers['SPECTRUM']['TSTART']
gti['STOP'] -= self._headers['SPECTRUM']['TSTART']
# if trigtime is zero (not present), remove from headers
if self.trigtime == 0.0:
self._headers['PRIMARY'].pop('TRIGTIME', None)
self._headers['EBOUNDS'].pop('TRIGTIME', None)
self._headers['SPECTRUM'].pop('TRIGTIME', None)
self._headers['GTI'].pop('TRIGTIME', None)
# create FITS
hdulist = fits.HDUList()
primary_hdu = fits.PrimaryHDU(header=self._headers['PRIMARY'])
hdulist.append(primary_hdu)
# ebounds extension
ebounds_hdu = fits.BinTableHDU(data=ebounds, name='EBOUNDS',
header=self._headers['EBOUNDS'])
hdulist.append(ebounds_hdu)
# spectrum extension
spectrum_hdu = fits.BinTableHDU(data=spec, name='SPECTRUM',
header=self._headers['SPECTRUM'])
if self.trigtime > 0.0:
spectrum_hdu.header['TZERO4'] = self.trigtime
spectrum_hdu.header['TZERO5'] = self.trigtime
else:
spectrum_hdu.header['TZERO4'] = self._headers['SPECTRUM']['TSTART']
spectrum_hdu.header['TZERO5'] = self._headers['SPECTRUM']['TSTART']
hdulist.append(spectrum_hdu)
# the GTI extension
gti_hdu = fits.BinTableHDU(data=gti, header=self._headers['GTI'],
name='GTI')
if self.trigtime > 0.0:
gti_hdu.header['TZERO1'] = self.trigtime
gti_hdu.header['TZERO2'] = self.trigtime
else:
gti_hdu.header['TZERO1'] = self._headers['SPECTRUM']['TSTART']
gti_hdu.header['TZERO2'] = self._headers['SPECTRUM']['TSTART']
hdulist.append(gti_hdu)
# write to file
try:
hdulist.writeto(self.full_path, checksum=True)
except Exception as e: print(e)
if self.trigtime == 0.0:
spec['TIME'] += self._headers['SPECTRUM']['TSTART']
spec['ENDTIME'] += self._headers['SPECTRUM']['TSTART']
def to_pha(self, time_ranges=None, energy_range=None, channel_range=None,
**kwargs):
"""Integrate the PHAII data over time to produce a PHA object
Args:
time_ranges ([(float, float), ...], optional):
The time range of the spectrum. If omitted, uses the entire
time range of the data.
energy_range ((float, float), optional):
The energy range of the spectrum. If omitted, uses the entire
energy range of the data.
channel_range ((int, int), optional):
The channel range of the spectrum. If omitted, uses the entire
energy range of the data.
**kwargs: Options passed to :meth:`gbm.data.PHA.from_data`
Returns:
:class:`~gbm.data.PHA`: The single spectrum PHA
"""
if time_ranges is None:
time_ranges = [self.time_range]
time_ranges = self._assert_range_list(time_ranges)
specs = []
times = []
for time_range in time_ranges:
spec = self.to_spectrum(time_range=time_range,
energy_range=energy_range,
channel_range=channel_range)
# PHA needs to have the full spectral range and zero out the ones
# we're not using
lomask = (self.data.emin < spec.range[0])
pre_counts = np.zeros(np.sum(lomask), dtype=int)
pre_lo_edges = self.data.emin[lomask]
pre_hi_edges = self.data.emax[lomask]
himask = (self.data.emax > spec.range[1])
post_counts = np.zeros(np.sum(himask), dtype=int)
post_lo_edges = self.data.emin[himask]
post_hi_edges = self.data.emax[himask]
spec.counts = np.concatenate(
(pre_counts, spec.counts, post_counts))
spec.lo_edges = np.concatenate(
(pre_lo_edges, spec.lo_edges, post_lo_edges))
spec.hi_edges = np.concatenate(
(pre_hi_edges, spec.hi_edges, post_hi_edges))
spec.exposure = np.full(spec.size, spec.exposure[0])
specs.append(spec)
# need the actual time range
times.append(self.data.slice_time(*time_range).time_range)
# sum disjoint time selections
data = EnergyBins.sum(specs)
# values for the PHA object
channel_mask = ~(lomask | himask)
tstart, tstop = np.min(times), np.max(
times) # time_ranges[0][0], time_ranges[-1][1]
obj = self.headers['PRIMARY']['OBJECT']
ra_obj = self.headers['PRIMARY']['RA_OBJ']
dec_obj = self.headers['PRIMARY']['DEC_OBJ']
err_rad = self.headers['PRIMARY']['ERR_RAD']
pha = PHA.from_data(data, tstart, tstop, channel_mask=channel_mask,
gti=times, trigtime=self.trigtime, object=obj,
detector=self.detector, ra_obj=ra_obj,
dec_obj=dec_obj,
err_rad=err_rad, datatype=self.datatype.lower(),
**kwargs)
return pha
def slice_time(self, time_ranges):
"""Slice the PHAII by one or more time range. Produces a new PHAII object.
Args:
time_ranges ([(float, float), ...]):
The time ranges to slice the data to.
Returns:
:class:`PHAII`: A new PHAII object with the requested time ranges
"""
time_ranges = self._assert_range_list(time_ranges)
data = [self.data.slice_time(*self._assert_range(time_range)) \
for time_range in time_ranges]
gti = time_slice_gti(data, self.gti)
data = TimeEnergyBins.merge_time(data)
hdr = self.headers['PRIMARY']
keys = list(hdr.keys())
obj = hdr['OBJECT'] if 'OBJECT' in keys else None
ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None
dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None
err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None
phaii = self.from_data(data, gti=gti, trigtime=self.trigtime,
detector=self.detector, object=obj,
ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad)
return phaii
def slice_energy(self, energy_ranges):
"""Slice the PHAII by one or more energy range. Produces a new PHAII object.
Args:
energy_ranges ([(float, float), ...]):
The energy ranges to slice the data to.
Returns:
:class:`PHAII`: A new PHAII object with the requested energy ranges
"""
energy_ranges = self._assert_range_list(energy_ranges)
data = [self.data.slice_energy(*self._assert_range(energy_range)) \
for energy_range in energy_ranges]
data = TimeEnergyBins.merge_energy(data)
hdr = self.headers['PRIMARY']
keys = list(hdr.keys())
obj = hdr['OBJECT'] if 'OBJECT' in keys else None
ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None
dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None
err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None
phaii = self.from_data(data, gti=self.gti, trigtime=self.trigtime,
detector=self.detector, object=obj,
ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad)
return phaii
def rebin_time(self, method, *args, time_range=(None, None)):
"""Rebin the PHAII in time given a rebinning method.
Produces a new PHAII object.
Args:
method (<function>): The rebinning function
*args: Arguments to be passed to the rebinning function
time_range ((float, float), optional):
The starting and ending time to rebin. If omitted, uses the
full range of data. Setting start or end to ``None`` will use
the data from the beginning or end of the data, respectively.
Returns
:class:`PHAII`: A new PHAII object with the requested rebinned data
"""
tstart, tstop = time_range
data = self.data.rebin_time(method, *args, tstart=tstart, tstop=tstop)
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
tstart, tstop = self._assert_range((tstart, tstop))
gti = [data.time_range]
try:
obj = self.headers['PRIMARY']['OBJECT']
ra_obj = self.headers['PRIMARY']['RA_OBJ']
dec_obj = self.headers['PRIMARY']['DEC_OBJ']
err_rad = self.headers['PRIMARY']['ERR_RAD']
except:
obj = ra_obj = dec_obj = err_rad = None
phaii = self.from_data(data, gti=gti, trigtime=self.trigtime,
detector=self.detector, object=obj,
ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad)
return phaii
def rebin_energy(self, method, *args, energy_range=(None, None)):
"""Rebin the PHAII in energy given a rebinning method.
Produces a new PHAII object.
Args:
method (<function>): The rebinning function
*args: Arguments to be passed to the rebinning function
energy_range ((float, float), optional):
The starting and ending energy to rebin. If omitted, uses the
full range of data. Setting start or end to ``None`` will use
the data from the beginning or end of the data, respectively.
Returns
:class:`PHAII`: A new PHAII object with the requested rebinned data
"""
emin, emax = energy_range
data = self.data.rebin_energy(method, *args, emin=emin, emax=emax)
gti = self.gti
obj = self.headers['PRIMARY']['OBJECT']
ra_obj = self.headers['PRIMARY']['RA_OBJ']
dec_obj = self.headers['PRIMARY']['DEC_OBJ']
err_rad = self.headers['PRIMARY']['ERR_RAD']
phaii = self.from_data(data, gti=gti, trigtime=self.trigtime,
detector=self.detector, object=obj,
ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad)
return phaii
class Ctime(DataFile, PHAII):
"""Class for CTIME data.
Attributes:
data (:class:`~.primitives.TimeEnergyBins`): The PHAII data
datatype (str): The datatype of the file
detector (str): The GBM detector the file is associated with
directory (str): The directory the file is located in
energy_range (float, float): The energy range of the data
filename (str): The filename
full_path (str): The full path+filename
gti ([(float, float), ...]): A list of tuples defining the good time intervals
headers (dict): The headers for each extension of the PHAII
id (str): The GBM file ID
is_gbm_file (bool): True if the file is a valid GBM standard file,
False if it is not.
is_trigger (bool): True if the file is a GBM trigger file, False if not
numchans (int): The number of energy channels
trigtime (float): The trigger time of the data, if available.
time_range (float, float): The time range of the data
"""
def __init__(self):
PHAII.__init__(self)
@classmethod
def from_data(cls, data, detector=None, **kwargs):
obj = super().from_data(data, detector=detector, **kwargs)
# set file properties
trigtime = None if obj.trigtime == 0.0 else obj.trigtime
obj.set_properties(detector=detector, trigtime=trigtime,
tstart=obj.time_range[0], datatype='ctime',
extension='pha')
return obj
class Cspec(DataFile, PHAII):
"""Class for CSPEC data.
Attributes:
data (:class:`~.primitives.TimeEnergyBins`): The PHAII data
datatype (str): The datatype of the file
detector (str): The GBM detector the file is associated with
directory (str): The directory the file is located in
energy_range (float, float): The energy range of the data
filename (str): The filename
full_path (str): The full path+filename
gti ([(float, float), ...]): A list of tuples defining the good time intervals
headers (dict): The headers for each extension of the PHAII
id (str): The GBM file ID
is_gbm_file (bool): True if the file is a valid GBM standard file,
False if it is not.
is_trigger (bool): True if the file is a GBM trigger file, False if not
numchans (int): The number of energy channels
trigtime (float): The trigger time of the data, if available.
time_range (float, float): The time range of the data
"""
def __init__(self):
PHAII.__init__(self)
@classmethod
def from_data(cls, data, detector=None, **kwargs):
obj = super().from_data(data, detector=detector, **kwargs)
# set file properties
trigtime = None if obj.trigtime == 0.0 else obj.trigtime
obj.set_properties(detector=detector, trigtime=trigtime,
tstart=obj.time_range[0], datatype='cspec',
extension='pha')
return obj
class TTE(DataFile):
"""Class for Time-Tagged Event data
Attributes:
data (:class:`~.primitives.EventList`): The event data
datatype (str): The datatype of the file
detector (str): The GBM detector the file is associated with
directory (str): The directory the file is located in
energy_range (float, float): The energy range of the data
filename (str): The filename
full_path (str): The full path+filename
gti ([(float, float), ...]): A list of tuples defining the good time intervals
headers (dict): The headers for each extension of the TTE
id (str): The GBM file ID
is_gbm_file (bool): True if the file is a valid GBM standard file,
False if it is not.
is_trigger (bool): True if the file is a GBM trigger file, False if not
numchans (int): The number of energy channels
trigtime (float): The trigger time of the data, if available.
time_range (float, float): The time range of the data
"""
def __init__(self):
super(TTE, self).__init__()
self._headers = OrderedDict()
self._data = None
self._gti = None
self._trigtime = 0.0
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def _assert_range_list(self, range_list):
try:
iter(range_list[0])
except:
range_list = [range_list]
range_list = [self._assert_range(r) for r in range_list]
return range_list
@property
def data(self):
return self._data
@property
def gti(self):
return self._assert_range_list(self._gti.tolist())
@property
def headers(self):
return self._headers
@property
def trigtime(self):
return self._trigtime
@trigtime.setter
def trigtime(self, val):
try:
float_val = float(val)
except:
raise TypeError('trigtime must be a float')
if float_val < 0.0:
raise ValueError('trigtime must be non-negative')
self._trigtime = float_val
self._headers['PRIMARY']['TRIGTIME'] = float_val
@property
def time_range(self):
return self._data.time_range
@property
def energy_range(self):
return self._data.energy_range
@property
def numchans(self):
return self._data.numchans
@classmethod
def open(cls, filename):
"""Open a TTE FITS file and return the TTE object
Args:
filename (str): The filename of the FITS file
Returns:
:class:`TTE`: The TTE object
"""
obj = cls()
obj._file_properties(filename)
with fits.open(filename, mmap=False) as hdulist:
for hdu in hdulist:
obj._headers.update({hdu.name: hdu.header})
# trigger time
if 'TRIGTIME' in list(obj._headers['PRIMARY'].keys()):
if obj._headers['PRIMARY']['TRIGTIME'] != '':
obj._trigtime = float(obj._headers['PRIMARY']['TRIGTIME'])
else:
obj._headers['PRIMARY']['TRIGTIME'] = 0.0
else:
obj._headers['PRIMARY']['TRIGTIME'] = 0.0
ebounds = hdulist['EBOUNDS'].data
# convert to recarray because there are many situations where we
# don't want TZERO applied, and it gets quite messy to keep track
# of where we do or don't want it applied...
events = hdulist['EVENTS'].data.view(np.recarray)
# Add TZERO to event times if not trigger
if obj._trigtime == 0:
events['TIME'] += obj._headers['EVENTS']['TZERO1']
# Do this for GTI as well
gti = hdulist['GTI'].data
gti = np.vstack((gti['START'], gti['STOP'])).squeeze().T
if obj._trigtime > 0.0:
gti -= obj._trigtime
# create the EventList, the core of the TTE class
obj._data = EventList.from_fits_array(events, ebounds)
obj._gti = gti
return obj
@classmethod
def from_data(cls, data, gti=None, trigtime=0.0, detector=None,
object=None,
ra_obj=None, dec_obj=None, err_rad=None):
"""Create a TTE object from an EventList data object.
Args:
data (:class:`~.primitives.EventList`): The event data
gti ([(float, float), ...], optional):
The list of tuples representing the good time intervals
(start, stop). If omitted, the GTI is assumed to be
[(tstart, tstop)].
trigtime (float, optional):
The trigger time, if applicable. If provided, the data times
will be shifted relative to the trigger time.
detector (str, optional): The detector that produced the data
object (str, optional): The object being observed
ra_obj (float, optional): The RA of the object
dec_obj (float, optional): The Dec of the object
err_rad (float, optional): The localization error radius of the object
Returns:
:class:`TTE`: The newly created TTE object
"""
obj = cls()
filetype = 'GBM PHOTON LIST'
obj._data = data
detchans = data.numchans
tstart, tstop = data.time_range
try:
trigtime = float(trigtime)
except:
raise TypeError('trigtime must be a float')
if trigtime < 0.0:
raise ValueError('trigtime must be non-negative')
obj._trigtime = trigtime
tstart += trigtime
tstop += trigtime
detector = None if detector == '' else detector
object = None if object == '' else object
ra_obj = None if ra_obj == '' else ra_obj
dec_obj = None if dec_obj == '' else dec_obj
err_rad = None if err_rad == '' else err_rad
# create the primary extension
primary_header = hdr.primary(detnam=detector, filetype=filetype,
tstart=tstart,
tstop=tstop, trigtime=trigtime,
object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad)
headers = [primary_header]
header_names = ['PRIMARY']
# ebounds extension
ebounds_header = hdr.ebounds(detnam=detector, tstart=tstart,
tstop=tstop,
trigtime=trigtime, object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad, detchans=detchans)
headers.append(ebounds_header)
header_names.append('EBOUNDS')
# spectrum extension
events_header = hdr.events(detnam=detector, tstart=tstart, tstop=tstop,
trigtime=trigtime, object=object,
ra_obj=ra_obj, dec_obj=dec_obj,
err_rad=err_rad, detchans=detchans)
headers.append(events_header)
header_names.append('EVENTS')
# gti extension
if gti is None:
gti = [data.time_range]
ngti = len(gti)
gti_rec = np.recarray(ngti, dtype=[('START', '>f8'), ('STOP', '>f8')])
gti_rec['START'] = [one_gti[0] for one_gti in gti]
gti_rec['STOP'] = [one_gti[1] for one_gti in gti]
gti_header = hdr.gti(detnam=detector, tstart=tstart, tstop=tstop,
trigtime=trigtime, object=object, ra_obj=ra_obj,
dec_obj=dec_obj, err_rad=err_rad)
headers.append(gti_header)
header_names.append('GTI')
obj._gti = gti_rec
# store headers and set data properties
obj._headers = {name: header for name, header in
zip(header_names, headers)}
# set file info
trig = None if obj.trigtime == 0.0 else obj.trigtime
obj.set_properties(detector=detector, trigtime=trig,
tstart=obj.time_range[0], datatype='tte',
extension='fit')
return obj
@classmethod
def merge(cls, ttes, primary=0, force_unique=True):
"""Merge multiple TTE objects into a single new TTE object.
Note:
The amount of data in a single TTE object can be quite large. It is
up to you to determine if you have enough memory to support the merge.
Args:
ttes (lists): A list of TTE objects to merge
primary (int):
The index into the list of TTE objects to designate the primary
TTE object. The primary object will be the reference for the
header information for the new merged TTE object. Default is the
first TTE object in the list (primary=0).
force_unique (bool, optional):
If True, force all events to be unique via brute force sorting.
If False, the EventLists will only be checked and masked for
overlapping time ranges. Events can potentially be lost if the
merged EventLists contain overlapping times (but not necessarily
duplicate events), however this method is much faster.
Default is True.
Returns:
:class:`TTE`: The new merged TTE object
"""
# merge the Event data
data = [tte.data for tte in ttes]
data = EventList.merge(data, sort_attrib='TIME',
force_unique=force_unique)
# merge the GTIs
gtis = [GTI.from_list(tte.gti) for tte in ttes]
merged_gti = gtis[0]
for gti in gtis[1:]:
merged_gti = GTI.merge(merged_gti, gti)
merged_gti = merged_gti.as_list()
# header info
hdr = ttes[primary].headers['PRIMARY']
keys = list(hdr.keys())
trigtime = ttes[primary].trigtime
detector = ttes[primary].detector
object = hdr['OBJECT'] if 'OBJECT' in keys else None
ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None
dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None
err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None
obj = cls.from_data(data, gti=merged_gti, trigtime=trigtime,
detector=detector, object=object, ra_obj=ra_obj,
dec_obj=dec_obj, err_rad=err_rad)
return obj
def get_exposure(self, time_ranges=None):
"""Calculate the total exposure of a time range or time ranges of data
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data.
Returns:
float: The exposure of the time selections
"""
time_ranges = self._assert_range_list(time_ranges)
exposure = self._data.get_exposure(time_ranges=time_ranges)
return exposure
def to_spectrum(self, time_range=None, energy_range=None,
channel_range=None):
"""Integrate the TTE data over time to produce a count spectrum
Args:
time_range ([(float, float), ...], optional):
The time range of the spectrum. If omitted, uses the entire
time range of the data.
energy_range ((float, float), optional):
The energy range of the spectrum. If omitted, uses the entire
energy range of the data.
channel_range ((int, int), optional):
The channel range of the spectrum. If omitted, uses the entire
energy range of the data.
Returns:
:class:`~.primitives.EnergyBins`: The spectrum data
"""
# slice to desired energy or channel range
if (channel_range is not None) or (energy_range is not None):
if channel_range is not None:
self._assert_range(channel_range)
energy_range = (self._data.emin[channel_range[0]],
self._data.emax[channel_range[1]])
temp = self._data.slice_energy(*self._assert_range(energy_range))
else:
temp = self._data
if time_range is None:
time_range = self.time_range
# time slice
segment = temp.time_slice(*self._assert_range(time_range))
spec = segment.count_spectrum
return spec
def to_phaii(self, bin_method, *args, time_range=None, energy_range=None,
channel_range=None, **kwargs):
"""Utilizing a binning function, convert the data to a PHAII object
Args:
bin_method (<function>): A binning function
*args: Arguments to pass to the binning function
time_range ([(float, float), ...], optional):
The time range of the spectrum. If omitted, uses the entire
time range of the data.
energy_range ((float, float), optional):
The energy range of the spectrum. If omitted, uses the entire
energy range of the data.
channel_range ((int, int), optional):
The channel range of the spectrum. If omitted, uses the entire
energy range of the data.
**kwargs: Options to pass to the binning function
Returns:
:class:`Cspec`: The PHAII object
"""
# slice to desired energy or channel range
if (channel_range is not None) or (energy_range is not None):
if channel_range is not None:
self._assert_range(channel_range)
energy_range = (self._data.emin[channel_range[0]],
self._data.emax[channel_range[1]])
temp = self._data.energy_slice(*self._assert_range(energy_range))
else:
temp = self._data
# do the time binning to create the TimeEnergyBins
if time_range is None:
tstart, tstop = None, None
else:
tstart, tstop = time_range
bins = temp.bin(bin_method, *args, tstart=tstart, tstop=tstop,
**kwargs)
# create the Cspec object
if 'OBJECT' in self.headers['PRIMARY']:
obj = self.headers['PRIMARY']['OBJECT']
ra_obj = self.headers['PRIMARY']['RA_OBJ']
dec_obj = self.headers['PRIMARY']['DEC_OBJ']
err_rad = self.headers['PRIMARY']['ERR_RAD']
else:
obj, ra_obj, dec_obj, err_rad = None, None, None, None
obj = Cspec.from_data(bins, gti=self.gti, trigtime=self.trigtime,
detector=self.detector, object=obj,
ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad)
return obj
def to_pha(self, time_ranges=None, energy_range=None, channel_range=None,
**kwargs):
"""Integrate the TTE data over time to produce a PHA object
Args:
time_ranges ([(float, float), ...], optional):
The time range of the spectrum. If omitted, uses the entire
time range of the data.
energy_range ((float, float), optional):
The energy range of the spectrum. If omitted, uses the entire
energy range of the data.
channel_range ((int, int), optional):
The channel range of the spectrum. If omitted, uses the entire
energy range of the data.
**kwargs: Options passed to :meth:`.PHA.from_data()`
Returns:
:class:`~.PHA`: The PHA object
"""
if time_ranges is None:
time_ranges = [self.time_range]
time_ranges = self._assert_range_list(time_ranges)
segs = [self._data.time_slice(*self._assert_range(time_range)) \
for time_range in time_ranges]
times = [seg.time_range for seg in segs]
el = EventList.merge(segs)
if energy_range is not None:
el = el.energy_slice(*self._assert_range(energy_range))
lomask = (self.data.emin < energy_range[0])
himask = (self.data.emax > energy_range[1])
elif channel_range is not None:
el = el.energy_slice(*self._assert_range(channel_range))
idx = np.arange(self.numchans)
lomask = (idx < channel_range[0])
himask = (idx > channel_range[1])
else:
lomask = himask = np.zeros(self.numchans, dtype=bool)
data = el.count_spectrum
# values for the PHA object
channel_mask = ~(lomask | himask)
if 'OBJECT' in self.headers['PRIMARY']:
obj = self.headers['PRIMARY']['OBJECT']
ra_obj = self.headers['PRIMARY']['RA_OBJ']
dec_obj = self.headers['PRIMARY']['DEC_OBJ']
err_rad = self.headers['PRIMARY']['ERR_RAD']
else:
obj, ra_obj, dec_obj, err_rad = None, None, None, None
pha = PHA.from_data(data, *el.time_range, channel_mask=channel_mask,
gti=times, trigtime=self.trigtime, object=obj,
detector=self.detector, ra_obj=ra_obj,
dec_obj=dec_obj,
err_rad=err_rad, datatype=self.datatype.lower(),
**kwargs)
return pha
def write(self, directory, filename=None):
"""Writes the data to a FITS file.
Args:
directory (str): The directory to write the file
filename (str, optional): If set, will override the standardized name
"""
# set filename
if (self.filename is None) and (filename is None):
raise NameError('Filename not set')
if filename is None:
filename = self.filename
self._full_path = os.path.join(directory, filename)
self._headers['PRIMARY']['FILENAME'] = filename
# ebounds table
ebounds = self._data._ebounds
# events table
trigtime = self.trigtime
events = self._data._events
# gti table
ngti = len(self.gti)
gti = np.recarray(ngti, dtype=[('START', '>f8'), ('STOP', '>f8')])
gti['START'] = np.array(self.gti)[:, 0]
gti['STOP'] = np.array(self.gti)[:, 1]
# TZERO wreaks havoc, so we have to adjust the values based on
# if TZERO=TRIGTIME or not
if self.trigtime == 0.0:
events['TIME'] -= self._headers['EVENTS']['TSTART']
gti['START'] -= self._headers['EVENTS']['TSTART']
gti['STOP'] -= self._headers['EVENTS']['TSTART']
# if trigtime is zero (not present), remove from headers
if self.trigtime == 0.0:
self._headers['PRIMARY'].pop('TRIGTIME', None)
self._headers['EBOUNDS'].pop('TRIGTIME', None)
self._headers['EVENTS'].pop('TRIGTIME', None)
self._headers['GTI'].pop('TRIGTIME', None)
# create FITS
hdulist = fits.HDUList()
primary_hdu = fits.PrimaryHDU(header=self._headers['PRIMARY'])
hdulist.append(primary_hdu)
# ebounds extension
ebounds_hdu = fits.BinTableHDU(data=ebounds, name='EBOUNDS',
header=self._headers['EBOUNDS'])
hdulist.append(ebounds_hdu)
# the events extension
spectrum_hdu = fits.BinTableHDU(data=events, name='EVENTS',
header=self._headers['EVENTS'])
if self.trigtime > 0.0:
spectrum_hdu.header['TZERO1'] = self.trigtime
else:
spectrum_hdu.header['TZERO1'] = self._headers['EVENTS']['TSTART']
hdulist.append(spectrum_hdu)
# the GTI extension
gti_hdu = fits.BinTableHDU(data=gti, header=self._headers['GTI'],
name='GTI')
if self.trigtime > 0.0:
gti_hdu.header['TZERO1'] = self.trigtime
gti_hdu.header['TZERO2'] = self.trigtime
else:
gti_hdu.header['TZERO1'] = self._headers['EVENTS']['TSTART']
gti_hdu.header['TZERO2'] = self._headers['EVENTS']['TSTART']
hdulist.append(gti_hdu)
# write to file
try:
hdulist.writeto(self.full_path, checksum=True)
except Exception as e: print(e)
if self.trigtime == 0.0:
events['TIME'] += self._headers['EVENTS']['TSTART']
def slice_time(self, time_ranges):
"""Slice the TTE by one or more time range. Produces a new TTE object.
Args:
time_ranges ([(float, float), ...]):
The time ranges to slice the data to.
Returns:
:class:`TTE`: A new TTE object with the requested time ranges
"""
time_ranges = self._assert_range_list(time_ranges)
data = [self.data.time_slice(*self._assert_range(time_range)) \
for time_range in time_ranges]
gti = time_slice_gti(data, self.gti)
data = EventList.merge(data, sort_attrib='TIME')
hdr = self.headers['PRIMARY']
keys = list(hdr.keys())
obj = hdr['OBJECT'] if 'OBJECT' in keys else None
ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None
dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None
err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None
tte = self.from_data(data, gti=gti, trigtime=self.trigtime,
detector=self.detector, object=obj,
ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad)
return tte
def slice_energy(self, energy_ranges):
"""Slice the TTE by one or more energy range. Produces a new TTE object.
Args:
energy_ranges ([(float, float), ...]):
The energy ranges to slice the data to.
Returns:
:class:`TTE`: A new TTE object with the requested energy ranges
"""
energy_ranges = self._assert_range_list(energy_ranges)
data = [self.data.energy_slice(*self._assert_range(energy_range)) \
for energy_range in energy_ranges]
data = EventList.merge(data, sort_attrib='TIME')
hdr = self.headers['PRIMARY']
keys = list(hdr.keys())
obj = hdr['OBJECT'] if 'OBJECT' in keys else None
ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None
dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None
err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None
tte = self.from_data(data, gti=self.gti, trigtime=self.trigtime,
detector=self.detector, object=obj,
ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad)
return tte
def rebin_energy(self, bin_method, *args):
"""Produce a TTE object with rebinned PHA channels
Args:
bin_method (<function>): A rebinning function
*args: Arguments to be passed to the bin method
Returns
:class:`TTE`: A new TTE object with the rebinned PHA channels
"""
data = self._data.rebin_energy(bin_method, *args)
detector = self.detector
# header info
hdr = self.headers['PRIMARY']
keys = list(hdr.keys())
object = hdr['OBJECT'] if 'OBJECT' in keys else None
ra_obj = hdr['RA_OBJ'] if 'RA_OBJ' in keys else None
dec_obj = hdr['DEC_OBJ'] if 'DEC_OBJ' in keys else None
err_rad = hdr['ERR_RAD'] if 'ERR_RAD' in keys else None
obj = TTE.from_data(data, gti=self.gti, trigtime=self.trigtime,
detector=detector, object=object, ra_obj=ra_obj,
dec_obj=dec_obj, err_rad=err_rad)
return obj