GBM-data-tools/data/trigdat.py

644 lines
22 KiB
Python

# trigdat.py: GBM trigger data class
#
# Authors: William Cleveland (USRA),
# Adam Goldstein (USRA) and
# Daniel Kocevski (NASA)
#
# Portions of the code are Copyright 2020 William Cleveland and
# Adam Goldstein, Universities Space Research Association
# All rights reserved.
#
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import astropy.io.fits as fits
import numpy as np
from collections import OrderedDict
from gbm.detectors import Detector
from .phaii import Ctime
from .poshist import PosHist
from .primitives import TimeEnergyBins
# Map the classification numbers to the string names
classifications = {0: 'ERROR', 1: 'UNRELOC', 2: 'LOCLPAR', 3: 'BELOWHZ',
4: 'GRB', 5: 'SGR', 6: 'TRANSNT', 7: 'DISTPAR',
8: 'SFL', 9: 'CYGX1', 10: 'SGR1806', 11: 'GROJ422',
19: 'TGF', 20: 'UNCERT', 21: 'GALBIN'}
# localization spectra
spectrum = ['hard', 'normal', 'soft']
class Trigdat(PosHist):
"""Class for the GBM Trigger Data.
Attributes:
backrates (:class:`~gbm.data.trigdat.BackRates`):
A BackRates object containing the info from the on-board background
datatype (str): The datatype of the file
detector (str): The GBM detector the file is associated with
directory (str): The directory the file is located in
filename (str): The filename
fsw_locations: (:class:`~gbm.data.trigdat.FswLocation`):
A list of flight-software-determined locations for the event
full_path (str): The full path+filename
headers (dict): The headers for each extension of the file
id (str): The GBM file ID
is_gbm_file (bool): True if the file is a valid GBM standard file,
False if it is not.
is_trigger (bool): True if the file is a GBM trigger file, False if not
maxrates (list of :class:`~gbm.data.trigdat.MaxRates`):
A list of MaxRates objects, each containing maxrates info
num_maxrates (int): The number of MaxRates issued by the flight software
time_range (float, float): The time range of the data
triggered_detectors: (list of str): The detectors that were triggered
trigrates (:class:`~gbm.data.trigdat.MaxRates`):
A MaxRates object containing the trigger information and rates
trigtime (float): The trigger time
"""
def __init__(self):
super(Trigdat, self).__init__()
self._headers = OrderedDict()
self._data = None
self._rates = None
self._trigrates = None
self._maxrates = None
self._backrates = None
self._fsw_locations = None
self._emin = np.array([3.4, 10.0, 22.0, 44.0, 95.0, 300., 500., 800.])
self._emax = np.array([10., 22.0, 44.0, 95.0, 300., 500., 800., 2000.])
self._bins64 = None
self._bins256 = None
self._bins1024 = None
self._bins8192 = None
self._time_range = None
self._detectors = [det.short_name for det in Detector]
@property
def trigtime(self):
return self._headers['PRIMARY']['TRIGTIME']
@property
def headers(self):
return self._headers
@property
def num_maxrates(self):
return len(self._maxrates)
@property
def trigrates(self):
return self._trigrates
@property
def backrates(self):
return self._backrates
@property
def maxrates(self):
return [self.get_maxrates(i) for i in range(self.num_maxrates)]
@property
def fsw_locations(self):
return [self.get_fsw_locations(i) for i in range(self.num_maxrates)]
@property
def time_range(self):
return self._time_range
@property
def triggered_detectors(self):
detmask = self.headers['PRIMARY']['DET_MASK']
detmask = np.array(list(detmask)).astype(bool)
if detmask.size == 14:
return (np.array(self._detectors)[detmask]).tolist()
elif detmask.size == 12:
return (np.array(self._detectors[:-2])[detmask]).tolist()
else:
return None
@classmethod
def open(cls, filename):
"""Open and read a trigdat file
Args:
filename(str): The filename of the trigdat file
Returns:
:class:`Trigdat`: The Trigdat object
"""
obj = cls()
obj._file_properties(filename)
# open FITS file
with fits.open(filename) as hdulist:
# store headers
for hdu in hdulist:
obj._headers.update({hdu.name: hdu.header})
# store trigrate, maxrates, backrates, and fsw location
obj._trigrates = MaxRates(hdulist['TRIGRATE'].data[0])
obj._maxrates = [MaxRates(maxrate) for maxrate in
hdulist['MAXRATES'].data]
obj._backrates = BackRates(hdulist['BCKRATES'].data[0])
obj._fsw_locations = [FswLocation(ob_calc) \
for ob_calc in hdulist['OB_CALC'].data]
obj._data = hdulist['EVNTRATE'].data
obj._data.sort(order='TIME')
# store position history
idx, dt = obj._time_indices(1024)
obj._from_trigdat(obj._data['TIME'][idx],
obj._data['SCATTITD'][idx],
obj._data['EIC'][idx])
# store the time history
obj._rates = obj._data['RATE'].reshape(-1, 14, 8)
obj._time_range = (
obj._data['ENDTIME'][0] - dt[0], obj._data['ENDTIME'][-1])
obj._gti = obj._gti_from_times(obj._data['TIME'],
obj._data['ENDTIME'])
return obj
def get_maxrates(self, index):
"""Retrieve a MaxRates
Args:
index (int): The index of the MaxRates to retrieve. Not to
exceed num_maxrates-1
Returns:
:class:`~gbm.data.trigdat.MaxRates`: The MaxRates object
"""
if index > self.num_maxrates:
raise ValueError('index out of range. ' \
'{} available maxrates'.format(self.num_maxrates))
return self._maxrates[index]
def get_fsw_locations(self, index):
"""Retrieve a flight software localization
Args:
index (int): The index of the localization to retrieve. Not to
exceed num_maxrates-1
Returns:
:class:`~gbm.data.trigdat.FswLocation`: The flight software localization info
"""
if index > self.num_maxrates:
raise ValueError('index out of range. ' \
'{} available locations'.format(
self.num_maxrates))
return self._fsw_locations[index]
def to_ctime(self, detector, timescale=1024):
"""Convert the data for a detector to a CTIME-like
:class:`~gbm.data.phaii.PHAII` object
Args:
detector (str): The detector to convert
timescale (int, optional):
The minimum timescale in ms of the data to return. Available
options are 1024, 256, and 64.
Returns:
:class:`~gbm.data.Ctime`: The CTIME-like PHAII object with the \
trigdat data
"""
# check for valid detectors and timescale
detector = detector.lower()
if detector not in self._detectors:
raise ValueError('Illegal detector name')
if (timescale != 1024) and (timescale != 256) and (timescale != 64):
raise ValueError('Illegal Trigdat resolution. Available resolutions: \
1024, 256, 64')
# grab the correct detector rates (stored as 14 dets x 8 channels)
det_idx = self._detectors.index(detector)
# return the requested timescales
time_idx, dt = self._time_indices(timescale)
# calculate counts and exposure
counts = self._rates[time_idx, det_idx, :] * (
dt[:, np.newaxis] / 1.024)
exposure = self._calc_exposure(counts, dt)
# the 'TIME' array is incorrect in the trigdat. we know this because
# the 'ENDTIME' is the value in the packet, so we must calculate tstart
# ourselves and forget about using 'TIME'
tstop = self._data['ENDTIME'][time_idx] - self.trigtime
tstart = tstop - dt
# create the Time-Energy histogram
bins = TimeEnergyBins(counts, tstart, tstop, exposure,
self._emin, self._emax)
# create the CTIME object
object = self.headers['PRIMARY']['OBJECT']
ra = self.headers['PRIMARY']['RA_OBJ']
dec = self.headers['PRIMARY']['DEC_OBJ']
err = self.headers['PRIMARY']['ERR_RAD']
obj = Ctime.from_data(bins, gti=self.gti, trigtime=self.trigtime,
detector=detector, object=object, ra_obj=ra,
dec_obj=dec, err_rad=err)
return obj
def sum_detectors(self, detectors, timescale=1024):
"""Sum the data from a list of detectors and convert to a CTIME-like
:class:`~gbm.data.phaii.PHAII` object
Args:
detectors (list of str): The detectors to sum
timescale (int, optional):
The minimum timescale in ms of the data to return. Available
options are 1024, 256, and 64.
Returns:
:class:`~gbm.data.Ctime`: The CTIME-like PHAII object with the detector-summed data
"""
# check for valid detectors and timescale
for det in detectors:
det = det.lower()
if det not in self._detectors:
raise ValueError('Illegal detector name')
if (timescale != 1024) and (timescale != 256) and (timescale != 64):
raise ValueError('Illegal Trigdat resolution. Available resolutions: \
1024, 256, 64')
counts = None
for det in detectors:
# grab the correct detector rates (stored as 14 dets x 8 channels)
det_idx = self._detectors.index(det)
# return the requested timescales
time_idx, dt = self._time_indices(timescale)
# calculate counts
if counts is None:
counts = self._rates[time_idx, det_idx, :] * (
dt[:, np.newaxis] / 1.024)
else:
counts += self._rates[time_idx, det_idx, :] * (
dt[:, np.newaxis] / 1.024)
exposure = dt
# the 'TIME' array is incorrect in the trigdat. we know this because
# the 'ENDTIME' is the value in the packet, so we must calculate tstart
# ourselves and forget about using 'TIME'
tstop = self._data['ENDTIME'][time_idx] - self.trigtime
tstart = self._fix_tstart(tstop, dt)
# create the Time-Energy histogram
bins = TimeEnergyBins(counts, tstart, tstop, exposure,
self._emin, self._emax)
det_str = '+'.join(detectors)
# create the CTIME object
object = self.headers['PRIMARY']['OBJECT']
ra = self.headers['PRIMARY']['RA_OBJ']
dec = self.headers['PRIMARY']['DEC_OBJ']
err = self.headers['PRIMARY']['ERR_RAD']
obj = Ctime.from_data(bins, gti=self.gti, trigtime=self.trigtime,
detector=det_str, object=object, ra_obj=ra,
dec_obj=dec, err_rad=err)
# Have to set the datatype property. The 8 energy channels is most like
# CTIME.
obj.set_properties(datatype='ctime', trigtime=self.trigtime)
return obj
def get_saa_passage(self, times):
in_saa = np.ones_like(times, dtype=bool)
for interval in self.gti:
mask = (times >= interval[0]) & (times <= interval[1])
in_saa[mask] = False
return in_saa
def _fix_tstart(self, tstop, dt):
# this ensures that edge differences < 1 ms get fixed
tstart = tstop - dt
mask = (np.abs(tstart[1:] - tstop[:-1]) < 0.001)
tstart[1:][mask] = tstop[:-1][mask]
return tstart
def _calc_exposure(self, counts, dt):
"""Calculate the exposure
Args:
counts (np.array): The observed counts in each bin
dt (np.array): The time bin widths
Returns:
np.array: The exposure of each bin
"""
deadtime = np.copy(counts)
deadtime[:, :7] *= 2.6e-6 # 2.6 us for each count
deadtime[:, 7] *= 1e-5 # 10 us for each count in overflow
total_deadtime = np.sum(deadtime, axis=1)
exposure = (1.0 - total_deadtime) * dt
return exposure
def _time_indices(self, time_res):
"""Indices into the Trigdat arrays corresponding to the desired time
resolution(s)
Args:
time_res (int): The time resolution in ms of the data
Returns:
(np.array, np.array): Indices into the trigdat arrays and the \
bin widths in seconds
"""
# bin widths
dt = np.round((self._data['ENDTIME'] - self._data['TIME']) * 1000)
# background bins
back_idx = np.where(dt == 8192)[0]
# 1 s scale bins - this is the minimum amount returned
idx = np.where(dt == 1024)[0]
cnt = len(idx)
# reconcile 8 s and 1 s data
idx = self._reconcile_timescales(back_idx, idx)
# reconcile 8 s + 1 s and 256 ms data
if time_res <= 256:
tidx = np.where(dt == 256)[0]
idx = self._reconcile_timescales(idx, tidx)
# reconcile 8 s + 1 s + 256 ms and 64 ms data
if time_res == 64:
tidx = np.where(dt == 64)[0]
idx = self._reconcile_timescales(idx, tidx)
# return reconciled indices
return idx, np.reshape(dt[idx] / 1000.0, len(idx))
def _reconcile_timescales(self, idx1, idx2):
"""Reconcile indices representing different timescales and glue them
together to form a complete (mostly) continuous set of indices
Args:
idx1 (np.array): Indices of the "bracketing" timescale
idx2 (np.array): Indices of the "inserted" timescale
Returns:
np.array: Indices of idx2 spliced into idx1
"""
# bin edges for both selections
start_times1 = self._data['TIME'][idx1]
end_times1 = self._data['ENDTIME'][idx1]
start_times2 = self._data['TIME'][idx2]
end_times2 = self._data['ENDTIME'][idx2]
# find where bracketing timescale ends and inserted timescale begins
start_idx = (np.where(end_times1 >= start_times2[0]))[0][0]
idx = np.concatenate((idx1[0:start_idx], idx2))
# find wehere inserted timescale ends and bracketing timescale begins again
end_idx = (np.where(start_times1 >= end_times2[-1]))[0][0]
idx = np.concatenate((idx, idx1[end_idx:]))
return idx
def _gti_from_times(self, tstarts, tstops):
"""Estimate the GTI from the bin start and stop times.
This may return multiple GTIs if several background packets are missing
Args:
tstarts (np.array): The start times of the bins
tstops (np.array): The end times of the bins
Returns:
[(float, float), ...]: A list of time ranges
"""
tstart = tstarts[0]
tstop = tstops[-1]
dt = tstarts[1:] - tstops[:-1]
idx = np.where(np.abs(dt) > 10.0)[0]
if np.sum(idx) > 0:
gti = [(tstart, tstops[idx[0] - 1]), (tstarts[idx[0]], tstop)]
else:
gti = [(tstart, tstop)]
return np.array(gti)
class MaxRates:
"""Class for the MaxRates data in Trigdat.
Parameters:
rec_array (np.recarray): The FITS TRIGRATE or MAXRATES record array
from the trigdat file
Attributes:
all_rates (np.array): An array (:attr:`numchans`, :attr:`numdets`) of
the maxrates
eic (np.array): The position of Fermi in Earth inertial coordinates
numchans (int): The number of energy channels
numdets (int): The number of detectors
quaternions (np.array): The quaternions at the maxrates time
timescale (float): The timescale of the maxrates
time_range (float, float): The time range of the maxrates
"""
def __init__(self, rec_array):
self._time_range = (rec_array['TIME'], rec_array['ENDTIME'])
self._quats = rec_array['SCATTITD']
self._eic = rec_array['EIC']
try:
self._rates = rec_array['TRIGRATE']
except:
self._rates = rec_array['MAXRATES']
@property
def numchans(self):
return self._rates.shape[0]
@property
def numdets(self):
return self._rates.shape[1]
@property
def time_range(self):
return self._time_range
@property
def timescale(self):
return self.time_range[1] - self.time_range[0]
@property
def quaternions(self):
return self._quats
@property
def eic(self):
return self._eic
@property
def all_rates(self):
return self._rates
def get_detector(self, det):
"""Retrieve the maxrates for a detector
Args:
det (str): The detector
Returns:
np.array: An array of size (:attr:`numchans`,) of rates for the detector
"""
mask = (np.array(self._detectors) == det)
return self._rates[:, mask].squeeze()
class BackRates:
"""Class for the background rates data in Trigdat.
Parameters:
rec_array (np.recarray): The FITS BCKRATES record array from the
trigdat file
Attributes:
all_rates (np.array): An array (:attr:`numchans`, :attr:`numdets`) of
the maxrates
numchans (int): The number of energy channels
numdets (int): The number of detectors
quality (int, int): The quality flags for the background
time_range (float, float): The time range of the maxrates
"""
def __init__(self, rec_array):
self._time_range = (rec_array['TIME'], rec_array['ENDTIME'])
self._quality = rec_array['QUALITY']
self._rates = rec_array['BCKRATES']
@property
def numchans(self):
return self._rates.shape[0]
@property
def numdets(self):
return self._rates.shape[1]
@property
def time_range(self):
return self._time_range
@property
def quality(self):
return self._quality
@property
def all_rates(self):
return self._rates
def get_detector(self, det):
"""Retrieve the background rates for a detector
Args:
det (str): The detector
Returns:
np.array: An array of size (:attr:`numchans`,) of background rates \
for the detector
"""
mask = (np.array(self._detectors) == det)
return self._rates[:, mask].squeeze()
class FswLocation:
"""Class for the flight software localization info
Parameters:
rec_array (np.recarray): The FITS OB_CALC record array from the
trigdat file
Attributes:
fluence (float): The fluence of the localization interval
hardness_ratio (float): The hardness ratio for the localization interval
intensity (float): The brightness of the signal
location (float, float, float): The RA, Dec, and statistical error of
the onboard localization
location_sc (float): The localization in spacecraft coordinates:
Azimuth, Zenith
next_classification (str, float):
The next most likely classification of the trigger and the probability
significance (float): The S/N ratio of the localization interval
spectrum (str): The spectrum used in the localization
time (float): Time at which the localization was calculated
timescale (float): The localization interval timescale
top_classification (str, float):
The most likely classification of the trigger and the probability
"""
def __init__(self, rec_array):
self._time = rec_array['TIME']
self._location = (
rec_array['RA'], rec_array['DEC'], rec_array['STATERR'])
self._algorithm = spectrum[rec_array['LOCALG'] - 1]
self._class1 = (classifications[rec_array['EVTCLASS'][0]],
rec_array['RELIABLT'][0])
self._class2 = (classifications[rec_array['EVTCLASS'][1]],
rec_array['RELIABLT'][1])
self._intensity = rec_array['INTNSITY']
self._hardness = rec_array['HDRATIO']
self._fluence = rec_array['FLUENCE']
self._sigma = rec_array['SIGMA']
self._timescale = rec_array['TRIG_TS']
self._azzen = (rec_array['TR_SCAZ'], rec_array['TR_SCZEN'])
@property
def time(self):
return self._time
@property
def location(self):
return self._location
@property
def spectrum(self):
return self._algorithm
@property
def top_classification(self):
return self._class1
@property
def next_classification(self):
return self._class2
@property
def intensity(self):
return self._intensity
@property
def hardness_ratio(self):
return self._hardness
@property
def fluence(self):
return self._fluence
@property
def significance(self):
return self._sigma
@property
def timescale(self):
return self._timescale
@property
def location_sc(self):
return self._azzen