# pha.py: PHA and BAK classes # # Authors: William Cleveland (USRA), # Adam Goldstein (USRA) and # Daniel Kocevski (NASA) # # Portions of the code are Copyright 2020 William Cleveland and # Adam Goldstein, Universities Space Research Association # All rights reserved. # # Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import os import astropy.io.fits as fits import numpy as np from collections import OrderedDict from .data import DataFile from . import headers as hdr from .primitives import EnergyBins class PHA(DataFile): """PHA class for count spectra. Attributes: data (:class:`~.primitives.EnergyBins`): The PHA data datatype (str): The datatype of the file detector (str): The GBM detector the file is associated with directory (str): The directory the file is located in energy_range (float, float): The energy range of the spectrum exposure (float): The exposure of the PHA data filename (str): The filename full_path (str): The full path+filename gti ([(float, float), ...]): The good time intervals headers (dict): The headers for each extension of the PHA id (str): The GBM file ID is_gbm_file (bool): True if the file is a valid GBM standard file, False if it is not. is_trigger (bool): True if the file is a GBM trigger file, False if not numchans (int): The number of energy channels tcent (float): The center time of the data time_range (float, float): The time range of the spectrum trigtime (float): The trigger time of the data, if available valid_channels (np.array(dtype=bool)): The channels that are marked valid and available for fitting """ def __init__(self): super(PHA, self).__init__() self._headers = OrderedDict() self._data = None self._gti = None self._trigtime = 0.0 def _assert_range(self, valrange): assert valrange[0] <= valrange[1], \ 'Range must be in increasing order: (lo, hi)' return valrange def _assert_range_list(self, range_list): try: iter(range_list[0]) except: range_list = [range_list] return range_list @property def data(self): return self._data @property def gti(self): return self._gti.tolist() @property def headers(self): return self._headers @property def trigtime(self): return self._trigtime @property def time_range(self): return (self._gti['START'][0], self._gti['STOP'][-1]) @property def tcent(self): return sum(self.time_range) / 2.0 @property def energy_range(self): return self._data.range @property def numchans(self): return self._data.size @property def valid_channels(self): return np.arange(self.numchans, dtype=int)[self._channel_mask] @property def exposure(self): return self._data.exposure[0] @classmethod def open(cls, filename): """Open a PHA FITS file and return the PHA object Args: filename (str): The filename of the FITS file Returns: :class:`PHA`: The PHA object """ obj = cls() obj._file_properties(filename) # open FITS file with fits.open(filename) as hdulist: # store headers for hdu in hdulist: obj._headers.update({hdu.name: hdu.header}) # trigger time if 'TRIGTIME' in list(obj._headers['PRIMARY'].keys()): if obj._headers['PRIMARY']['TRIGTIME'] != '': obj._trigtime = float(obj._headers['PRIMARY']['TRIGTIME']) else: obj._headers['PRIMARY']['TRIGTIME'] = 0.0 else: obj._headers['PRIMARY']['TRIGTIME'] = 0.0 # if trigger, shift to trigger time reference spec = hdulist['SPECTRUM'].data ebounds = hdulist['EBOUNDS'].data gti = np.asarray(hdulist['GTI'].data) if obj._trigtime is not None: gti['START'] -= obj._trigtime gti['STOP'] -= obj._trigtime # create the energy histogram, the core of the PHA exposure = np.full(ebounds.shape[0], obj._headers['SPECTRUM']['EXPOSURE']) obj._data = EnergyBins(spec['COUNTS'], ebounds['E_MIN'], ebounds['E_MAX'], exposure) obj._gti = gti obj._set_channel_mask(spec['QUALITY']) return obj @classmethod def from_data(cls, data, tstart, tstop, gti=None, trigtime=0.0, detector=None, object=None, ra_obj=None, dec_obj=None, err_rad=None, datatype=None, backfile=None, rspfile=None, meta=None, channel_mask=None): """Create a PHA object from an EnergyBins data object. Args: data (:class:`~.primitives.EnergyBins`): The PHA count spectrum data tstart (float): The start time of the data tstop (float): The end time of the data gti ([(float, float), ...], optional): The list of tuples representing the good time intervals (start, stop). If omitted, the GTI is assumed to be [(tstart, tstop)]. trigtime (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. Default is zero. detector (str, optional): The detector that produced the data object (str, optional): The object being observed ra_obj (float, optional): The RA of the object dec_obj (float, optional): The Dec of the object err_rad (float, optional): The localization error radius of the object datatype (str, optional): The datatype from which the PHA is created backfile (str, optional): The associated background file rspfile (str, optional): The associated response file meta (str, optional): Additional metadata string to be added to standard filename channel_mask (np.array(dtype=bool)): A boolean array representing the valid channels to be used for fitting. If omitted, assumes all non-zero count channels are valid. Returns: :class:`PHA`: The PHA object """ obj = cls() filetype = 'SPECTRUM' obj._data = data detchans = data.size try: trigtime = float(trigtime) except: raise TypeError('trigtime must be a float') if trigtime < 0.0: raise ValueError('trigtime must be non-negative') obj._trigtime = trigtime tstart += trigtime tstop += trigtime # create the primary extension primary_header = hdr.primary(detnam=detector, filetype=filetype, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) headers = [primary_header] header_names = ['PRIMARY'] # ebounds extension ebounds_header = hdr.ebounds(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, detchans=detchans) headers.append(ebounds_header) header_names.append('EBOUNDS') # spectrum extension spectrum_header = hdr.pha_spectrum(exposure=data.exposure[0], detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, detchans=detchans, backfile=backfile, rspfile=rspfile) headers.append(spectrum_header) header_names.append('SPECTRUM') # gti extension if gti is None: gti = [(tstart, tstop)] gti = np.array(gti, dtype=[('START', '>f8'), ('STOP', '>f8')]) gti['START'] += trigtime gti['STOP'] += trigtime gti_header = hdr.gti(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) headers.append(gti_header) header_names.append('GTI') obj._gti = gti # set the channel mask # if no channel mask is given, assume zero-count channels are bad if channel_mask is None: channel_mask = np.zeros(data.size, dtype=bool) channel_mask[data.counts > 0] = True obj._channel_mask = channel_mask # store headers and set data properties obj._headers = {name: header for name, header in zip(header_names, headers)} # set file info obj.set_properties(detector=detector, trigtime=trigtime, tstart=obj.time_range[0], datatype=datatype, extension='pha', meta=meta) gti['START'] -= obj.trigtime gti['STOP'] -= obj.trigtime return obj def write(self, directory, filename=None, backfile=None): """Writes the data to a FITS file. Args: directory (str): The directory to write the file filename (str, optional): If set, will override the standardized name backfile (str, optional): If set, will add the associated BAK file name to the PHA SPECTRUM header """ # set filename if (self.filename is None) and (filename is None): raise NameError('Filename not set') if filename is None: filename = self.filename self._full_path = os.path.join(directory, filename) self._headers['PRIMARY']['FILENAME'] = filename if backfile is not None: self._headers['SPECTRUM']['BACKFILE'] = backfile # make ebounds table chan_idx = np.arange(self.numchans, dtype=int) ebounds = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'), ('E_MIN', '>f4'), ('E_MAX', '>f4')]) ebounds['CHANNEL'] = chan_idx ebounds['E_MIN'] = self._data.lo_edges ebounds['E_MAX'] = self._data.hi_edges # make spectrum table trigtime = self.trigtime spec = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'), ('COUNTS', '>i4'), ('QUALITY', '>i2')]) spec['CHANNEL'] = chan_idx spec['COUNTS'] = self._data.counts spec['QUALITY'] = (~self._channel_mask).astype(int) # create FITS hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self._headers['PRIMARY']) hdulist.append(primary_hdu) ebounds_hdu = fits.BinTableHDU(data=ebounds, name='EBOUNDS', header=self._headers['EBOUNDS']) hdulist.append(ebounds_hdu) spectrum_hdu = fits.BinTableHDU(data=spec, name='SPECTRUM', header=self._headers['SPECTRUM']) hdulist.append(spectrum_hdu) gti = np.copy(self._gti) gti['START'] += trigtime gti['STOP'] += trigtime gti_hdu = fits.BinTableHDU(data=gti, header=self._headers['GTI'], name='GTI') hdulist.append(gti_hdu) hdulist.writeto(self.full_path, checksum=True) def slice_energy(self, energy_ranges): """Slice the PHA by one or more energy range. Produces a new PHA object. Args: energy_ranges ([(float, float), ...]): The energy ranges to slice over Returns: :class:`PHA`: A new PHA object with the requested energy range """ energy_ranges = self._assert_range_list(energy_ranges) data = [self.data.slice(*self._assert_range(energy_range)) \ for energy_range in energy_ranges] data = EnergyBins.merge(data) tstart = self.headers['PRIMARY']['TSTART'] - self.trigtime tstop = self.headers['PRIMARY']['TSTOP'] - self.trigtime obj = self.headers['PRIMARY']['OBJECT'] ra_obj = self.headers['PRIMARY']['RA_OBJ'] dec_obj = self.headers['PRIMARY']['DEC_OBJ'] err_rad = self.headers['PRIMARY']['ERR_RAD'] backfile = self.headers['SPECTRUM']['BACKFIL'] rspfile = self.headers['SPECTRUM']['RESPFILE'] meta = self._filename_obj.meta pha = self.from_data(data, tstart, tstop, gti=self.gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, datatype=self.datatype, backfile=backfile, rspfile=rspfile, meta=meta) return pha def rebin_energy(self, method, *args, emin=None, emax=None): """Rebin the PHA in energy given a rebinning method. Produces a new PHA object. Args: method (): The rebinning function *args: Arguments to be passed to the rebinning function emin (float, optional): The minimum energy to rebin. If omitted, uses the lowest energy edge of the data emax (float, optional): The maximum energy to rebin. If omitted, uses the highest energy edge of the data Returns: :class:`PHA`: A new PHA object with the requested rebinned data """ data = self.data.rebin(method, *args, emin=None, emax=None) tstart = self.headers['PRIMARY']['TSTART'] - self.trigtime tstop = self.headers['PRIMARY']['TSTOP'] - self.trigtime obj = self.headers['PRIMARY']['OBJECT'] ra_obj = self.headers['PRIMARY']['RA_OBJ'] dec_obj = self.headers['PRIMARY']['DEC_OBJ'] err_rad = self.headers['PRIMARY']['ERR_RAD'] backfile = self.headers['SPECTRUM']['BACKFIL'] rspfile = self.headers['SPECTRUM']['RESPFILE'] meta = self._filename_obj.meta pha = self.from_data(data, tstart, tstop, gti=self.gti, trigtime=self.trigtime, detector=self.detector, object=obj, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, datatype=self.datatype, backfile=backfile, rspfile=rspfile, meta=meta) return pha def _set_channel_mask(self, quality_flags): self._channel_mask = np.zeros_like(quality_flags, dtype=bool) mask = (quality_flags == 0) self._channel_mask[mask] = True class BAK(PHA): """Class for a PHA background spectrum. Attributes: data (:class:`~.primitives.EnergyBins`): The PHA data datatype (str): The datatype of the file detector (str): The GBM detector the file is associated with directory (str): The directory the file is located in energy_range (float, float): The energy range of the spectrum exposure (float): The exposure of the PHA data filename (str): The filename full_path (str): The full path+filename gti ([(float, float), ...]): The good time intervals headers (dict): The headers for each extension of the PHA id (str): The GBM file ID is_gbm_file (bool): True if the file is a valid GBM standard file, False if it is not. is_trigger (bool): True if the file is a GBM trigger file, False if not numchans (int): The number of energy channels tcent (float): The center time of the data time_range (float, float): The time range of the spectrum trigtime (float): The trigger time of the data, if available """ def __init__(self): PHA.__init__(self) self._channel_mask = None @classmethod def open(cls, filename): """Open a BAK FITS file and return the BAK object Args: filename (str): The filename of the FITS file Returns: :class:`BAK`: The BAK object """ from ..background import BackgroundSpectrum obj = cls() obj._file_properties(filename) # open FITS file with fits.open(filename) as hdulist: # store headers for hdu in hdulist: obj._headers.update({hdu.name: hdu.header}) # trigger time if 'TRIGTIME' in list(obj._headers['PRIMARY'].keys()): if obj._headers['PRIMARY']['TRIGTIME'] != '': obj._trigtime = float(obj._headers['PRIMARY']['TRIGTIME']) else: obj._headers['PRIMARY']['TRIGTIME'] = 0.0 else: obj._headers['PRIMARY']['TRIGTIME'] = 0.0 spec = hdulist['SPECTRUM'].data ebounds = hdulist['EBOUNDS'].data gti = np.asarray(hdulist['GTI'].data) if obj._trigtime is not None: gti['START'] -= obj._trigtime gti['STOP'] -= obj._trigtime # create the background spectrum, the core of the BAK exposure = np.full(ebounds.shape[0], obj._headers['SPECTRUM']['EXPOSURE']) obj._data = BackgroundSpectrum(spec['RATE'], spec['STAT_ERR'], ebounds['E_MIN'], ebounds['E_MAX'], exposure) obj._gti = gti return obj @classmethod def from_data(cls, data, tstart, tstop, gti=None, trigtime=0.0, detector=None, object=None, ra_obj=None, dec_obj=None, err_rad=None, datatype=None, meta=None): """Create a BAK object from a :class:`~gbm.background.BackgroundSpectrum` data object. Args: data (:class:`~gbm.background.BackgroundSpectrum`): The background count rate spectrum tstart (float): The start time of the data tstop (float): The end time of the data gti ([(float, float), ...], optional): The list of tuples representing the good time intervals (start, stop). If omitted, the GTI is assumed to be [(tstart, tstop)]. trigtime (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. Default is zero. detector (str, optional): The detector that produced the data object (str, optional): The object being observed ra_obj (float, optional): The RA of the object dec_obj (float, optional): The Dec of the object err_rad (float, optional): The localization error radius of the object datatype (str, optional): The datatype from which the PHA is created meta (str, optional): Additional metadata string to be added to standard filename Returns: :class:`BAK`: The BAK object """ obj = cls() filetype = 'BACKGROUND' obj._data = data detchans = data.size try: trigtime = float(trigtime) except: raise TypeError('trigtime must be a float') if trigtime < 0.0: raise ValueError('trigtime must be non-negative') obj._trigtime = trigtime tstart += trigtime tstop += trigtime # create the primary extension primary_header = hdr.primary(detnam=detector, filetype=filetype, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) headers = [primary_header] header_names = ['PRIMARY'] # ebounds extension ebounds_header = hdr.ebounds(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, detchans=detchans) headers.append(ebounds_header) header_names.append('EBOUNDS') # spectrum extension spectrum_header = hdr.pha_spectrum(exposure=data.exposure[0], detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad, detchans=detchans, poisserr=False) spectrum_header['HDUCLAS2'] = ('BKG', 'Background PHA Spectrum') spectrum_header['HDUCLAS3'] = ('RATE', 'PHA data stored as rates') spectrum_header['TUNIT2'] = 'count/s' spectrum_header['TUNIT3'] = 'count/s' headers.append(spectrum_header) header_names.append('SPECTRUM') # gti extension if gti is None: gti = [(tstart, tstop)] gti = np.array(gti, dtype=[('START', '>f8'), ('STOP', '>f8')]) gti['START'] += trigtime gti['STOP'] += trigtime gti_header = hdr.gti(detnam=detector, tstart=tstart, tstop=tstop, trigtime=trigtime, object=object, ra_obj=ra_obj, dec_obj=dec_obj, err_rad=err_rad) headers.append(gti_header) header_names.append('GTI') obj._gti = gti # store headers and set data properties obj._headers = {name: header for name, header in zip(header_names, headers)} # set file info obj.set_properties(detector=detector, trigtime=trigtime, tstart=obj.time_range[0], datatype=datatype, extension='bak', meta=meta) gti['START'] -= obj.trigtime gti['STOP'] -= obj.trigtime return obj def write(self, directory, filename=None): """Writes the data to a FITS file. Args: directory (str): The directory to write the file filename (str, optional): If set, will override the standardized name """ # set filename if (self.filename is None) and (filename is None): raise NameError('Filename not set') if filename is None: filename = self.filename self._full_path = os.path.join(directory, filename) self._headers['PRIMARY']['FILENAME'] = filename # make ebounds table chan_idx = np.arange(self.numchans, dtype=int) ebounds = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'), ('E_MIN', '>f4'), ('E_MAX', '>f4')]) ebounds['CHANNEL'] = chan_idx ebounds['E_MIN'] = self._data.lo_edges ebounds['E_MAX'] = self._data.hi_edges # make spectrum table trigtime = self.trigtime spec = np.recarray(self.numchans, dtype=[('CHANNEL', '>i2'), ('RATE', '>f8'), ('STAT_ERR', '>f8')]) spec['CHANNEL'] = chan_idx spec['RATE'] = self._data.rates spec['STAT_ERR'] = self._data.rate_uncertainty # create FITS hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self._headers['PRIMARY']) hdulist.append(primary_hdu) ebounds_hdu = fits.BinTableHDU(data=ebounds, name='EBOUNDS', header=self._headers['EBOUNDS']) hdulist.append(ebounds_hdu) spectrum_hdu = fits.BinTableHDU(data=spec, name='SPECTRUM', header=self._headers['SPECTRUM']) hdulist.append(spectrum_hdu) gti = np.copy(self._gti) gti['START'] += trigtime gti['STOP'] += trigtime gti_hdu = fits.BinTableHDU(data=gti, header=self._headers['GTI'], name='GTI') hdulist.append(gti_hdu) hdulist.writeto(self.full_path, checksum=True) def rebin_energy(self, method, *args, emin=None, emax=None): """Not Implemented""" raise NotImplementedError('Function not available for BAK objects') def slice_energy(self, method, *args, emin=None, emax=None): """Not Implemented""" raise NotImplementedError('Function not available for BAK objects')