GBM-data-tools/data/primitives.py

1855 lines
70 KiB
Python

# primitives.py: Primitive data classes for pre-binned and unbinned data
#
# Authors: William Cleveland (USRA),
# Adam Goldstein (USRA) and
# Daniel Kocevski (NASA)
#
# Portions of the code are Copyright 2020 William Cleveland and
# Adam Goldstein, Universities Space Research Association
# All rights reserved.
#
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import numpy as np
class EventList:
"""A primitive class defining a TTE event list.
Attributes:
channel_range (int, int): The range of the channels in the list
count_spectrum (:class:`EnergyBins`): The count spectrum histogram
emax (np.array): The PHA upper energy edges
emin (np.array): The PHA lower energy edges
energy_range (float, float): The range of the energy edges in the list
numchans (int): The number of energy channels. Note that not all
channels will necessarily have events, especially if a
slice is made over energy.
pha (np.array): The PHA channel array
size (int): The number of events in the list
time (np.array): The time array
time_range (float, float): The range of the times in the list
"""
def __init__(self):
self._events = np.array([], dtype=[('TIME', '>f8'), ('PHA', '>i2')])
self._ebounds = np.array([], dtype=[('CHANNEL', '>i2'),
('E_MIN', '>f4'),
('E_MAX', '>f4')])
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def sort(self, attrib):
"""In-place sort by attribute. Either by 'TIME' or 'PHA'
Args:
attrib (str): The name of the EventList attribute,
either 'TIME' or 'PHA'
"""
if attrib not in self._keys:
raise ValueError('{} is not a valid attribute.'.format(attrib) + \
' {} are valid attributes'.format(self._keys))
idx = np.argsort(self._events[attrib])
self._events = self._events[:][idx]
def time_slice(self, tstart, tstop):
"""Perform a slice in time of the EventList and return a new EventList
Args:
tstart (float): The start of the time slice
tstop (float): The end of the time slice
Returns:
:class:`EventList`: A new EventList object containing the time slice
"""
mask = (self.time >= tstart) & (self.time <= tstop)
return EventList.from_fits_array(self._events[mask], self._ebounds)
def channel_slice(self, chanlo, chanhi):
"""Perform a slice in energy channels of the EventList and return a
new EventList
Args:
chanlo (int): The start of the channel slice
chanhi (int): The end of the channel slice
Returns:
:class:`EventList`: A new EventList object containing the channel slice
"""
mask = (self.pha >= chanlo) & (self.pha <= chanhi)
return EventList.from_fits_array(self._events[mask],
self._ebounds)
def energy_slice(self, emin, emax):
"""Perform a slice in energy of the EventList and return a new EventList.
Since energies are binned, an emin or emax falling inside of an energy
channel bin will include that bin in the slice.
Args:
emin (float): The start of the energy slice
emax (float): The end of the energy slice
Returns:
:class:`EventList`: A new EventList object containing the energy slice
"""
mask = (self.emin < emax) & (self.emax > emin)
ebounds = self._ebounds[mask]
return self.channel_slice(ebounds['CHANNEL'][0],
ebounds['CHANNEL'][-1])
def bin(self, method, *args, tstart=None, tstop=None,
event_deadtime=2.6e-6, overflow_deadtime=1.0e-5, **kwargs):
"""Bin the EventList in time given a binning function and return a
2D time-energy channel histogram.
The binning function should take as input an array of times as well
as a tstart and tstop keywords for partial list binning. Additional
arguments and keyword arguments specific to the function are allowed.
The function should return an array of time edges for the bins, such
that, for `n` bins, there are `n` + 1 edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the EventList to be binned,
otherwise binning will begin at the time of the first event.
tstop (float, optional):
If set, defines the end time of the EventList to be binned,
otherwise binning will end at the time of the last event.
event_deadtime (float, optional): The deadtime per event in seconds.
Default is 2.6e-6.
overflow_deadtime (float, optional):
The deadtime per event in the overflow channel in seconds.
Default is 1e-5.
**kwargs: Options to be passed to the binning function
Returns:
:class:`TimeEnergyBins`: A Time-Energy Channel histogram
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
# set the start and stop of the rebinning segment
mask = (self.time >= tstart) & (self.time <= tstop)
bin_times = self.time[mask]
kwargs['tstart'] = tstart
kwargs['tstop'] = tstop
# get the time edges from the binning function and then do the 2d histo
time_edges = method(bin_times, *args, **kwargs)
counts = np.histogram2d(bin_times, self.pha[mask],
[time_edges, np.arange(self.numchans + 1)])[0]
# calculate exposure
# for gbm, 2.6 microsec per count in channels < 127;
# 10 microsec per count for overflow
lo_edges = time_edges[:-1]
hi_edges = time_edges[1:]
overflow_counts = counts[:, -1]
deadtime = np.sum(counts, axis=1) * event_deadtime + \
overflow_counts * (overflow_deadtime - event_deadtime)
exposure = (hi_edges - lo_edges) - deadtime
bins = TimeEnergyBins(counts, lo_edges, hi_edges, exposure,
self.emin, self.emax)
return bins
def rebin_energy(self, method, *args):
"""Rebin the PHA channels using the specified binning algorithm. This
does not change the number of events in the EventList, but changes their
assignment to a PHA channel and bins the energy bounds mapping to those
channels. A new EventList object is returned.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
Returns:
:class:`EventList: A new EventList object with the rebinned PHA channels
"""
# exposure and counts; not really used other than for some specific
# binning algorithms
exposure = np.full(self.numchans, self.get_exposure())
chans, counts = np.unique(self.pha, return_counts=True)
if chans.size != self.numchans:
counts_fill = np.zeros(self.numchans)
counts_fill[chans] = counts
counts = counts_fill
edges = np.arange(self.numchans + 1)
# call the binning algorithm and get the new edges
_, _, new_edges = method(counts, exposure, edges, *args)
# re-assign the pha channels based on the new edges
# and also rebin the ebounds
new_pha = np.zeros_like(self.pha)
new_ebounds = []
for i in range(len(new_edges) - 1):
emin = new_edges[i]
emax = new_edges[i + 1]
mask = (self.pha >= emin) & (self.pha < emax)
new_pha[mask] = i
new_ebounds.append((i, self._ebounds[emin]['E_MIN'],
self._ebounds[emax - 1]['E_MAX']))
# create the new EventList object with the rebinned ebounds
new_events = self._events
new_events['PHA'] = new_pha
new_ebounds = np.array(new_ebounds, dtype=[('CHANNEL', '>i2'),
('E_MIN', '>f4'),
('E_MAX', '>f4')])
obj = EventList.from_fits_array(new_events, new_ebounds)
return obj
def get_exposure(self, time_ranges=None, event_deadtime=2.6e-6,
overflow_deadtime=1.0e-5):
"""Calculate the total exposure of a time range or time ranges of data
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data.
event_deadtime (float, optional): The deadtime per event in seconds.
Default is 2.6e-6.
overflow_deadtime (float, optional):
The deadtime per event in the overflow channel in seconds.
Default is 1e-5.
Returns:
float: The exposure of the time selections
"""
if time_ranges is None:
time_ranges = [self.time_range]
try:
iter(time_ranges[0])
except:
time_ranges = [time_ranges]
exposure = 0.0
for i in range(len(time_ranges)):
tstart, tstop = self._assert_range(time_ranges[i])
tcent = (tstop + tstart) / 2.0
dt = (tstop - tstart) / 2.0
mask = (np.abs(self.time - tcent) <= dt)
# mask = (self.time >= tstart) & (self.time < tstop)
deadtime = np.sum(mask) * event_deadtime
deadtime += np.sum(self.pha[mask] == self.channel_range[1]) * \
(overflow_deadtime - event_deadtime)
exposure += (tstop - tstart) - deadtime
return exposure
@property
def time(self):
return self._events['TIME']
@property
def pha(self):
return self._events['PHA']
@property
def emin(self):
return self._ebounds['E_MIN']
@property
def emax(self):
return self._ebounds['E_MAX']
@property
def size(self):
return self._events.shape[0]
@property
def numchans(self):
return self._ebounds.shape[0]
@property
def time_range(self):
if self.size > 0:
return np.min(self.time), np.max(self.time)
@property
def channel_range(self):
if self.size > 0:
return np.min(self.pha), np.max(self.pha)
@property
def energy_range(self):
if self.size > 0:
emin = self._ebounds[self.pha.min()]['E_MIN']
emax = self._ebounds[self.pha.max()]['E_MAX']
return (emin, emax)
@property
def count_spectrum(self):
counts = np.histogram(self.pha, bins=np.arange(self.numchans + 1))[0]
exposure = np.full(self.numchans, self.get_exposure())
bins = EnergyBins(counts, self.emin, self.emax, exposure)
return bins
@classmethod
def from_fits_array(cls, events_arr, ebounds):
"""Create an EventList object from TTE FITS arrays
Args:
events_arr (np.recarray or astropy.io.fits.fitsrec.FITS_rec):
The TTE events array
ebounds (np.recarray or astropy.io.fits.fitsrec.FITS_rec):
The TTE Ebounds array, mapping channel numbers to energy bins
Returns:
:class:`EventList`: The new EventList
"""
cls = EventList()
try:
cls._keys = events_arr.names
except:
cls._keys = events_arr.dtype.names
cls._events = events_arr
cls._ebounds = ebounds
return cls
@classmethod
def from_lists(cls, times_list, pha_list, chan_lo, chan_hi):
"""Create an EventList object from lists of times, channels,
and the channel bounds. The list of times and channels must be the
same length, and the list of channel boundaries are used to map the
PHA index number in `pha_list` to energy channels.
Args:
times_list (list of float): A list of event times
pha_list (list of int): A list of PHA channels. Must be same length
as `times_list`.
chan_lo (list of float): A list of the lower edges for the energy
channels.
chan_hi (list of float): A list of the upper edges for the energy
channels. Must be same length as `chan_hi`.
Returns:
:class:`EventList`: The new EventList
"""
num_events = len(times_list)
num_chans = len(chan_lo)
if num_events != len(pha_list):
raise ValueError(
'The length of times_list and pha_list must be the same')
if num_chans != len(chan_hi):
raise ValueError(
'The length of chan_lo and chan_hi must be the same')
# events array
events_arr = np.array(list(zip(*(times_list, pha_list))),
dtype=[('TIME', '>f8'), ('PHA', '>i2')])
# ebounds array
chan_idx = np.arange(num_chans).tolist()
ebounds = np.array(list(zip(*(chan_idx, chan_lo, chan_hi))),
dtype=[('CHANNEL', '>i2'), ('E_MIN', '>f4'),
('E_MAX', '>f4')])
obj = cls.from_fits_array(events_arr, ebounds)
return obj
@classmethod
def merge(cls, eventlists, sort_attrib=None, force_unique=False):
"""Merge multiple EventLists together in time and optionally sort.
Args:
eventlist (list of :class:`EventList`):
A list containing the EventLists to be merged
sort_attrib (str, optional):
The name of the EventList attribute to sort, either 'TIME' or 'PHA'
force_unique (bool, optional):
If True, force all events to be unique via brute force sorting.
If False, the EventLists will only be checked and masked for
overlapping time ranges. Events can potentially be lost if the
merged EventLists contain overlapping times (but not necessarily
duplicate events), however this method is much faster.
Default is False.
Returns:
:class:`EventList`: A new EventList object containing the merged \
EventLists
"""
# put in time order
idx = np.argsort([np.min(eventlist.time) for eventlist in eventlists])
eventlists = [eventlists[i] for i in idx]
new_events = np.array([], dtype=[('TIME', '>f8'), ('PHA', '>i2')])
for eventlist in eventlists:
# skip empty EventLists
if eventlist.size == 0:
continue
# fix for time-shifted data
ref_time = eventlist.time[0]
temp_events = np.array(eventlist._events)
if temp_events['TIME'][0] != ref_time:
offset = ref_time - temp_events['TIME'][0]
temp_events['TIME'] += offset
# if not forcing to be unique, just make sure there is no time overlap
if (not force_unique) and (new_events.size > 0):
mask = (temp_events['TIME'] > np.max(new_events['TIME']))
temp_events = temp_events[mask]
new_events = np.concatenate((new_events, temp_events))
# force unique: make sure that we only keep unique events (slower)
if force_unique:
new_events = np.unique(new_events)
# mark: TODO add check for ebounds consistency
cls = EventList.from_fits_array(new_events, eventlists[0]._ebounds)
# do a sort
if sort_attrib is not None:
cls.sort(sort_attrib)
return cls
class Bins:
"""A primitive class defining a set of histogram bins
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
Attributes:
centroids (np.array): The centroids of the bins
counts (np.array): The counts in each bin
count_uncertainty (np.array): The count uncertainty in each bin
hi_edges (np.array): The high-value edges of the bins
lo_edges (np.array): The low-value edges of the bins
range (float, float): The range of the bin edges
rates (np.array): The count rate of each bin: counts/width
rate_uncertainty (np.array): The count rate uncertainty of each bin
size (int): Number of bins
widths (np.array): The widths of the bins
"""
def __init__(self, counts, lo_edges, hi_edges):
self.counts = counts
self.lo_edges = lo_edges
self.hi_edges = hi_edges
self._good_segments = self._calculate_good_segments()
@property
def size(self):
return len(self.lo_edges)
@property
def counts(self):
return self._counts
@counts.setter
def counts(self, val):
try:
iter(val)
self._counts = np.array(val)
except:
raise TypeError('Bins.counts must be an iterable')
@property
def lo_edges(self):
return self._lo_edges
@lo_edges.setter
def lo_edges(self, val):
try:
iter(val)
self._lo_edges = np.array(val)
except:
raise TypeError('Bins.lo_edges must be an iterable')
@property
def hi_edges(self):
return self._hi_edges
@hi_edges.setter
def hi_edges(self, val):
try:
iter(val)
self._hi_edges = np.array(val)
except:
raise TypeError('Bins.hi_edges must be an iterable')
@property
def widths(self):
return self.hi_edges - self.lo_edges
@property
def centroids(self):
return (self.hi_edges + self.lo_edges) / 2.0
@property
def range(self):
return (self.lo_edges[0], self.hi_edges[-1])
@property
def count_uncertainty(self):
return np.sqrt(self._counts)
@property
def rates(self):
return self.counts / self.widths
@property
def rate_uncertainty(self):
return self.count_uncertainty / self.width
def closest_edge(self, val, which='either'):
"""Return the closest bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val; or
* 'high' - closest edge higher than val. Default is 'either'
Returns:
float: The closest bin edge to the input value
"""
edges = np.concatenate((self.lo_edges, [self.hi_edges[-1]]))
idx = np.argmin(np.abs(val - edges))
if which == 'low' and (idx - 1) >= 0:
if edges[idx] > val:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
def slice(self, lo_edge, hi_edge):
"""Perform a slice over the range of the bins and return a new Bins
object. Note that lo_edge and hi_edge values that fall inside a bin will
result in that bin being included.
Args:
lo_edge (float): The start of the slice
hi_edge (float): The end of the slice
Returns:
:class:`Bins`: A new Bins object containing the slice
"""
lo_snap = self.closest_edge(lo_edge, which='low')
hi_snap = self.closest_edge(hi_edge, which='high')
if lo_snap == hi_snap:
mask = (self.lo_edges < hi_snap) & (self.hi_edges >= lo_snap)
else:
mask = (self.lo_edges < hi_snap) & (self.hi_edges > lo_snap)
obj = Bins(self.counts[mask], self.lo_edges[mask], self.hi_edges[mask])
return obj
def contiguous_bins(self):
"""Return a list of Bins, each one containing a continuous segment of
data. This is done by comparing the edges of each bin, and if there
is a gap between edges, the data is split into separate Bin objects,
each containing a contiguous set of data.
Returns
list of :class:`Bins`: A list of Bins
"""
bins = [self.slice(seg[0], seg[1]) for seg in self._good_segments]
return bins
def _calculate_good_segments(self):
"""Calculates the ranges of data that are contiguous segments
Returns:
[(float, float), ...]: A list of tuples, each containing the edges \
of a contiguous segment.
"""
mask = (self.lo_edges[1:] != self.hi_edges[:-1])
if mask.sum() == 0:
return [(self.lo_edges[0], self.hi_edges[-1])]
times = np.concatenate(([self.lo_edges[0]], self.hi_edges[:-1][mask],
self.lo_edges[1:][mask], [self.hi_edges[-1]]))
times.sort()
return times.reshape(-1, 2).tolist()
class TimeBins(Bins):
"""A class defining a set of Time History (lightcurve) bins.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
Attributes:
centroids (np.array): The centroids of the bins
counts (np.array): The counts in each bin
count_uncertainty (np.array): The count uncertainty in each bin
exposure (np.array): The exposure of each bin
hi_edges (np.array): The high-value edges of the bins
lo_edges (np.array): The low-value edges of the bins
range (float, float): The range of the bin edges
rates (np.array): The count rate of each bin: counts/exposure
rate_uncertainty (np.array): The count rate uncertainty of each bin
size (int): Number of bins
widths (np.array): The widths of the bins
"""
def __init__(self, counts, lo_edges, hi_edges, exposure):
super().__init__(counts, lo_edges, hi_edges)
self.exposure = exposure
@property
def exposure(self):
return self._exposure
@exposure.setter
def exposure(self, val):
try:
iter(val)
self._exposure = np.array(val)
except:
raise TypeError('TimeBins.exposure must be an iterable')
@property
def rates(self):
r = np.zeros_like(self.exposure)
mask = (self.exposure > 0.0)
r[mask] = self.counts[mask] / self.exposure[mask]
return r
@property
def rate_uncertainty(self):
r = np.zeros_like(self.exposure)
mask = (self.exposure > 0.0)
r[mask] = self.count_uncertainty[mask] / self.exposure[mask]
return r
@classmethod
def from_fits_array(cls, arr):
"""Create an TimeBins object from a FITS counts array
Args:
arr (np.recarray or astropy.io.fits.fitsrec.FITS_rec):
The FITS counts array
Returns:
:class:`TimeBins`: The new TimeBins object
"""
keys = arr.dtype.names
if 'COUNTS' not in keys or 'TIME' not in keys or \
'ENDTIME' not in keys or 'EXPOSURE' not in keys:
raise ValueError('Array has missing or invalid columns')
counts = arr['COUNTS']
if len(counts.shape) == 2:
counts = np.sum(arr['COUNTS'], axis=1)
obj = TimeBins(counts, arr['TIME'], arr['ENDTIME'], arr['EXPOSURE'])
return obj
def slice(self, tstart, tstop):
"""Perform a slice over a time range and return a new Bins object. Note
that tstart and tstop values that fall inside a bin will result in
that bin being included.
Args:
lo_edge (float): The start of the slice
hi_edge (float): The end of the slice
Returns:
:class:`TimeBins`: A new TimeBins object containing the time slice
"""
tstart_snap = self.closest_edge(tstart, which='low')
tstop_snap = self.closest_edge(tstop, which='high')
mask = (self.lo_edges < tstop_snap) & (self.hi_edges > tstart_snap)
obj = self.__class__(self.counts[mask], self.lo_edges[mask],
self.hi_edges[mask], self.exposure[mask])
return obj
@classmethod
def merge(cls, histos):
"""Merge multiple TimeBins together.
Args:
histos (list of :class:`TimeBins`):
A list containing the TimeBins to be merged
Returns:
:class:`TimeBins`: A new TimeBins object containing the merged TimeBins
"""
num = len(histos)
# sort by start time
tstarts = np.concatenate([histo.lo_edges for histo in histos])
idx = np.argsort(tstarts)
# concatenate the histos in order
counts = histos[idx[0]].counts
lo_edges = histos[idx[0]].lo_edges
hi_edges = histos[idx[0]].hi_edges
exposure = histos[idx[0]].exposure
for i in range(1, num):
bin_starts = histos[idx[i]].lo_edges
# make sure there is no overlap
mask = (bin_starts >= hi_edges[-1])
counts = np.concatenate((counts, histos[idx[i]].counts[mask]))
lo_edges = np.concatenate(
(lo_edges, histos[idx[i]].lo_edges[mask]))
hi_edges = np.concatenate(
(hi_edges, histos[idx[i]].hi_edges[mask]))
exposure = np.concatenate(
(exposure, histos[idx[i]].exposure[mask]))
# new TimeBins object
merged_bins = cls(counts, lo_edges, hi_edges, exposure)
return merged_bins
@classmethod
def sum(cls, histos):
"""Sum multiple TimeBins together if they have the same time range
(support) and the same bin widths. Example use would be summing
two histograms at different energies.
Args:
histos (list of :class:`TimeBins`):
A list containing the TimeBins to be summed
Returns:
:class:`TimeBins`: A new TimeBins object containing the summed TimeBins
"""
counts = np.zeros(histos[0].size)
for histo in histos:
assert histo.size == histos[0].size, \
"The histograms must all have the same size"
assert np.all(histo.lo_edges == histos[0].lo_edges), \
"The histograms must all have the same support"
assert np.all(histo.widths == histos[0].widths), \
"The histograms must all have the same exposure"
counts += histo.counts
# averaged exposure
exposure = np.mean([histo.exposure for histo in histos], axis=0)
sum_bins = cls(counts, histos[0].lo_edges, histos[0].hi_edges,
exposure)
return sum_bins
def rebin(self, method, *args, tstart=None, tstop=None):
"""Rebin the TimeBins object in given a binning function and return a
a new TimeBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the TimeBins to be binned,
otherwise binning will begin at the time of the first bin edge.
tstop (float, optional):
If set, defines the end time of the TimeBins to be binned,
otherwise binning will end at the time of the last bin edge.
Returns:
:class:`TimeBins`: The rebinned TimeBins object
"""
# set the start and stop of the rebinning segment
trange = self.range
if tstart is None:
tstart = trange[0]
if tstop is None:
tstop = trange[1]
if tstart < trange[0]:
tstart = trange[0]
if tstop > trange[1]:
tstop = trange[1]
bins = self.contiguous_bins()
new_histos = []
for bin in bins:
trange = bin.range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = None
post = None
if (tstop < trange[0]) or (tstart > trange[1]):
histo = None
elif tstop == trange[1]:
if tstart > trange[0]:
pre = bin.slice(trange[0], tstart)
histo = bin.slice(self.closest_edge(tstart, which='high'),
trange[1])
elif tstart == trange[0]:
histo = bin.slice(trange[0],
self.closest_edge(tstop, which='low'))
if tstop < trange[1]:
post = bin.slice(tstop, trange[1])
elif (tstart > trange[0]) and (tstop < trange[1]):
pre = bin.slice(trange[0], tstart)
histo = bin.slice(self.closest_edge(tstart, which='high'),
self.closest_edge(tstop, which='low'))
post = bin.slice(tstop, trange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the rebinned rates
if histo is not None:
edges = np.append(histo.lo_edges, histo.hi_edges[-1])
new_counts, new_exposure, new_edges = method(histo.counts,
histo.exposure,
edges, *args)
new_histo = self.__class__(new_counts, new_edges[:-1],
new_edges[1:],
new_exposure)
else:
new_histo = None
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i is not None]
new_histos.append(self.__class__.merge(histos_to_merge))
new_histo = self.__class__.merge(new_histos)
return new_histo
class EnergyBins(TimeBins):
"""A class defining a set of Energy (count spectra) bins.
Parameters:
counts (np.array): The array of counts in each bin
lo_edges (np.array): The low-value edges of the bins
hi_edges (np.array): The high-value edges of the bins
exposure (np.array): The exposure of each bin
Attributes:
centroids (np.array): The geometric centroids of the bins
counts (np.array): The counts in each bin
count_uncertainty (np.array): The count uncertainty in each bin
exposure (np.array): The exposure of each bin
hi_edges (np.array): The high-value edges of the bins
lo_edges (np.array): The low-value edges of the bins
range (float, float): The range of the bin edges
rates (np.array): The differential count rate of each bin:
counts/(exposure*widths)
rate_uncertainty (np.array): The count rate uncertainty of each bin
size (int): Number of bins
widths (np.array): The widths of the bins
"""
def __init__(self, counts, lo_edges, hi_edges, exposure):
super().__init__(counts, lo_edges, hi_edges, exposure)
@property
def centroids(self):
return np.sqrt(self.hi_edges * self.lo_edges)
@property
def rates(self):
return self.counts / (self.exposure * self.widths)
@property
def rate_uncertainty(self):
return self.count_uncertainty / (self.exposure * self.widths)
@classmethod
def from_fits_array(cls, arr, ebounds):
"""Create an EnergyBins object from a FITS counts array
Args:
arr (np.recarray or astropy.io.fits.fitsrec.FITS_rec):
The FITS counts array
ebounds (np.recarray or astropy.io.fits.fitsrec.FITS_rec):
The ebounds array, mapping channel numbers to energy bins
Returns:
:class:`EnergyBins`: The new EnergyBins object
"""
keys = arr.dtype.names
if 'COUNTS' not in keys or 'EXPOSURE' not in keys:
raise ValueError('Array has missing or invalid columns')
keys = ebounds.dtype.names
if 'E_MIN' not in keys or 'E_MAX' not in keys:
raise ValueError('ebounds has missing or invalid columns')
counts = arr['COUNTS']
if len(counts.shape) == 2:
counts = np.sum(arr['COUNTS'], axis=0)
exposure = np.full(counts.size, np.sum(arr['EXPOSURE']))
obj = EnergyBins(counts, ebounds['E_MIN'], ebounds['E_MAX'], exposure)
return obj
@classmethod
def sum(cls, histos):
"""Sum multiple EnergyBins together if they have the same energy range
(support). Example use would be summing two count spectra.
Args:
histos (list of :class:`EnergyBins`):
A list containing the EnergyBins to be summed
Returns:
:class:`EnergyBins`: A new EnergyBins object containing the \
summed EnergyBins
"""
counts = np.zeros(histos[0].size)
exposure = 0.0
for histo in histos:
assert histo.size == histos[0].size, \
"The histograms must all have the same size"
assert np.all(histo.lo_edges == histos[0].lo_edges), \
"The histograms must all have the same support"
counts += histo.counts
exposure += histo.exposure
sum_bins = cls(counts, histos[0].lo_edges, histos[0].hi_edges,
exposure)
return sum_bins
def rebin(self, method, *args, emin=None, emax=None):
"""Rebin the EnergyBins object in given a binning function and return a
a new EnergyBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
emin (float, optional):
If set, defines the starting energy of the EnergyBins to be
binned, otherwise binning will begin at the first bin edge.
emax (float, optional):
If set, defines the ending energy of the EnergyBins to be binned,
otherwise binning will end at the last bin edge.
Returns:
:class:`EnergyBins`: The rebinned EnergyBins object
"""
histo = super().rebin(method, *args, tstart=emin, tstop=emax)
histo._exposure = self.exposure[:histo.size]
return histo
class TimeEnergyBins():
"""A class defining a set of 2D Time-Energy bins.
Parameters:
counts (np.array): The array of counts in each bin
tstart (np.array): The low-value edges of the time bins
tstop (np.array): The high-value edges of the time bins
exposure (np.array): The exposure of each bin
emin (np.array): The low-value edges of the energy bins
emax (np.array): The high-value edges of the energy bins
Attributes:
chanwidths (np.array): The bin widths along the energy axis
count_uncertainty (np.array): The counts uncertainty in each bin
energy_centroids (np.array): The bin centroids along the energy axis
energy_range (float, float): The range of the data along the energy axis
numchans (int): The number of energy channels along the energy axis
numtimes (int): The number of bins along the time axis
rates (np.array): The rates in each Time-Energy Bin
rates_per_kev (np.array): The differential rates in units of counts/s/keV
rate_uncertainty (np.array): The rate uncertainty in each bin
rate_uncertainty_per_kev (np.array):
The differential rate uncertainty in units of counts/s/keV
size (int, int): The number of bins along both axes (numtimes, numchans)
time_centroids (np.array): The bin centroids along the time axis
time_range (float, float): The range of the data along the time axis
time_widths (np.array): The bin widths along the time axis
"""
def __init__(self, counts, tstart, tstop, exposure, emin, emax):
self.counts = counts
self.tstart = tstart
self.tstop = tstop
self.exposure = exposure
self.emin = emin
self.emax = emax
if self.numtimes > 0:
self._good_time_segments = self._calculate_good_segments(
self.tstart,
self.tstop)
else:
self._good_time_segments = [(None, None)]
if self.numchans > 0:
self._good_energy_segments = self._calculate_good_segments(
self.emin,
self.emax)
else:
self._good_energy_segments = [(None, None)]
@property
def numtimes(self):
return len(self.exposure)
@property
def numchans(self):
return len(self.emin)
@property
def size(self):
return (self.numtimes, self.numchans)
@property
def time_widths(self):
return (self.tstop - self.tstart)
@property
def chan_widths(self):
return (self.emax - self.emin)
@property
def time_centroids(self):
return (self.tstop + self.tstart) / 2.0
@property
def energy_centroids(self):
return np.sqrt(self.emin * self.emax)
@property
def time_range(self):
return (self.tstart[0], self.tstop[-1])
@property
def energy_range(self):
return (self.emin[0], self.emax[-1])
@property
def count_uncertainty(self):
return np.sqrt(self.counts)
@property
def rates(self):
return self.counts / (self.exposure[:, np.newaxis])
@property
def rate_uncertainty(self):
return self.count_uncertainty / (self.exposure[:, np.newaxis])
@property
def rates_per_kev(self):
return self.rates / self.chan_widths[:, np.newaxis]
@property
def rate_uncertainty_per_kev(self):
return self.rate_uncertainty / self.chan_widths[:, np.newaxis]
def _assert_range(self, valrange):
assert valrange[0] <= valrange[1], \
'Range must be in increasing order: (lo, hi)'
return valrange
def _slice_time_mask(self, tstart, tstop):
tstart_snap = self.closest_time_edge(tstart, which='low')
tstop_snap = self.closest_time_edge(tstop, which='high')
if tstart_snap == tstop_snap:
mask = (self.tstart < tstop_snap) & (self.tstop >= tstart_snap)
else:
mask = (self.tstart < tstop_snap) & (self.tstop > tstart_snap)
return mask
def _slice_energy_mask(self, emin, emax):
emin_snap = self.closest_energy_edge(emin, which='low')
emax_snap = self.closest_energy_edge(emax, which='high')
if emin_snap == emax_snap:
mask = (self.emin < emax_snap) & (self.emax >= emin_snap)
else:
mask = (self.emin < emax_snap) & (self.emax > emin_snap)
return mask
def _calculate_good_segments(self, lo_edges, hi_edges):
"""Calculates the ranges of data that are contiguous segments
Args:
lo_edges (np.array): The lower bin edges
hi_edges (np.array): The upper bin edges
Returns:
[(float, float), ...]: A list of tuples, each containing the edges \
of a contiguous segment
"""
mask = (lo_edges[1:] != hi_edges[:-1])
if mask.sum() == 0:
return [(lo_edges[0], hi_edges[-1])]
edges = np.concatenate(([lo_edges[0]], hi_edges[:-1][mask],
lo_edges[1:][mask], [hi_edges[-1]]))
edges.sort()
return edges.reshape(-1, 2).tolist()
def closest_time_edge(self, val, which='either'):
"""Return the closest time bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
float: The closest time bin edge to the input value
"""
edges = np.concatenate((self.tstart, [self.tstop[-1]]))
idx = np.argmin(np.abs(val - edges))
if which == 'low':
if (edges[idx] > val) and (idx - 1) >= 0:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
def closest_energy_edge(self, val, which='either'):
"""Return the closest energy bin edge
Args:
val (float): Input value
which (str, optional): Options are:
* 'either' - closest edge to val;
* 'low' - closest edge lower than val;
* 'high' - closest edge higher than val. Default is 'either'
Returns:
float: The closest energy bin edge to the input value
"""
edges = np.concatenate((self.emin, [self.emax[-1]]))
idx = np.argmin(np.abs(val - edges))
if which == 'low':
if (edges[idx] > val) and (idx - 1) >= 0:
idx -= 1
elif (which == 'high') and (idx + 1) < edges.size:
if edges[idx] < val:
idx += 1
else:
pass
return edges[idx]
def slice_time(self, tstart, tstop):
"""Perform a slice over a time range and return a new TimeEnergyBins
object. Note that tstart and tstop values that fall inside a bin will
result in that bin being included.
Args:
tstart (float): The start of the slice
tstop (float): The end of the slice
Returns:
:class:`TimeEnergyBins`: A new TimeEnergyBins object containing \
the time slice
"""
mask = self._slice_time_mask(tstart, tstop)
cls = type(self)
obj = cls(self.counts[mask, :], self.tstart[mask], self.tstop[mask],
self.exposure[mask], self.emin, self.emax)
return obj
def slice_energy(self, emin, emax):
"""Perform a slice over an energy range and return a new TimeEnergyBins
object. Note that emin and emax values that fall inside a bin will
result in that bin being included.
Args:
emin (float): The start of the slice
emax (float): The end of the slice
Returns:
:class:`TimeEnergyBins`: A new TimeEnergyBins object containing \
the energy slice
"""
mask = self._slice_energy_mask(emin, emax)
obj = TimeEnergyBins(self.counts[:, mask], self.tstart, self.tstop,
self.exposure, self.emin[mask], self.emax[mask])
return obj
def integrate_energy(self, emin=None, emax=None):
"""Integrate the histogram over the energy axis (producing a lightcurve).
Limits on the integration smaller than the full range can be set.
Args:
emin (float, optional):
The low end of the integration range. If not set, uses the
lowest energy edge of the histogram
emax (float, optional):
The high end of the integration range. If not set, uses the
highest energy edge of the histogram
Returns:
:class:`TimeBins`: A TimeBins object containing the lightcurve histogram
"""
if emin is None:
emin = self.energy_range[0]
if emax is None:
emax = self.energy_range[1]
mask = self._slice_energy_mask(emin, emax)
counts = np.sum(self.counts[:, mask], axis=1)
obj = TimeBins(counts, self.tstart, self.tstop, self.exposure)
return obj
def integrate_time(self, tstart=None, tstop=None):
"""Integrate the histogram over the time axis (producing a count rate
spectrum). Limits on the integration smaller than the full range can
be set.
Args:
tstart (float, optional):
The low end of the integration range. If not set, uses the
lowest time edge of the histogram
tstop (float, optional):
The high end of the integration range. If not set, uses the
highest time edge of the histogram
Returns:
:class:`EnergyBins`: A EnergyBins object containing the count \
rate spectrum histogram
"""
if tstart is None:
tstart = self.time_range[0]
if tstop is None:
tstop = self.time_range[1]
mask = self._slice_time_mask(tstart, tstop)
counts = np.sum(self.counts[mask, :], axis=0)
exposure = np.sum(self.exposure[mask])
exposure = np.full(counts.size, exposure)
obj = EnergyBins(counts, self.emin, self.emax, exposure)
return obj
def contiguous_time_bins(self):
"""Return a list of TimeEnergyBins, each one containing a contiguous
time segment of data. This is done by comparing the edges of each time
bin, and if thereis a gap between edges, the data is split into
separate TimeEnergyBin objects, each containing a time-contiguous set
of data.
Returns:
list of :class:`TimeEnergyBins`: A list of TimeEnergyBins
"""
bins = [self.slice_time(seg[0], seg[1]) for seg in
self._good_time_segments]
return bins
def contiguous_energy_bins(self):
"""Return a list of TimeEnergyBins, each one containing a contiguous
energy segment of data. This is done by comparing the edges of each
energy bin, and if thereis a gap between edges, the data is split into
separate TimeEnergyBin objects, each containing an energy-contiguous set
of data.
Returns:
list of :class:`TimeEnergyBins`: A list of TimeEnergyBins
"""
bins = [self.slice_energy(seg[0], seg[1]) for seg in
self._good_energy_segments]
return bins
def rebin_time(self, method, *args, tstart=None, tstop=None):
"""Rebin the TimeEnergyBins object along the time axis given a binning
function and return a new TimeEnergyBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
tstart (float, optional):
If set, defines the start time of the TimeEnergyBins to be
binned, otherwise binning will begin at the time of the first
bin edge.
tstop (float, optional):
If set, defines the end time of the TimeEnergyBins to be
binned, otherwise binning will end at the time of the last
bin edge.
Returns:
:class:`TimeEnergyBins`: The rebinned TimeEnergyBins object
"""
# set the start and stop of the rebinning segment
trange = self.time_range
if tstart is None:
tstart = trange[0]
if tstop is None:
tstop = trange[1]
if tstart < trange[0]:
tstart = trange[0]
if tstop > trange[1]:
tstop = trange[1]
bins = self.contiguous_time_bins()
new_histos = []
for bin in bins:
trange = bin.time_range
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = None
post = None
if (tstop < trange[0]) or (tstart > trange[1]):
histo = None
elif tstop == trange[1]:
if tstart > trange[0]:
pre = bin.slice_time(trange[0], tstart)
histo = bin.slice_time(self.closest_time_edge(tstart,
which='high'),
trange[1])
elif tstart == trange[0]:
histo = bin.slice_time(trange[0],
self.closest_time_edge(tstop,
which='low'))
if tstop < trange[1]:
post = bin.slice_time(tstop, trange[1])
elif (tstart > trange[0]) and (tstop < trange[1]):
pre = bin.slice_time(trange[0], tstart)
histo = bin.slice_time(
self.closest_time_edge(tstart, which='high'),
self.closest_time_edge(tstop, which='low'))
post = bin.slice_time(tstop, trange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the rebinned rates
if histo is not None:
edges = np.append(histo.tstart, histo.tstop[-1])
new_counts = []
for i in range(bin.numchans):
new_cts, new_exposure, new_edges = method(
histo.counts[:, i],
histo.exposure,
edges, *args)
new_counts.append(new_cts)
new_counts = np.array(new_counts).T
new_histo = TimeEnergyBins(new_counts, new_edges[:-1],
new_edges[1:],
new_exposure, bin.emin, bin.emax)
else:
new_histo = bin
if len(new_histo.tstart) == 0:
new_histo = None
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i is not None]
new_histos.append(TimeEnergyBins.merge_time(histos_to_merge))
new_histo = TimeEnergyBins.merge_time(new_histos)
return new_histo
def rebin_energy(self, method, *args, emin=None, emax=None):
"""Rebin the TimeEnergyBins object along the energy axis given a binning
function and return a new TimeEnergyBins object
The binning function should take as input an array of counts,
array of exposures, and an array of bin edges. Additional arguments
specific to the function are allowed. The function should return an
array of the new counts, new exposure, and new edges.
Args:
method (<function>): A binning function
*args: Arguments to be passed to the binning function
emin (float, optional):
If set, defines the starting edge of the TimeEnergyBins to be
binned, otherwise binning will begin at the the first bin edge.
emax (float, optional):
If set, defines the ending edge of the TimeEnergyBins to be
binned, otherwise binning will end at the last bin edge.
Returns:
:class:`TimeEnergyBins`: The rebinned TimeEnergyBins object
"""
# set the start and stop of the rebinning segment
erange = self.energy_range
if emin is None:
emin = erange[0]
if emax is None:
emax = erange[1]
if emin < erange[0]:
emin = erange[0]
if emax > erange[1]:
emax = erange[1]
bins = self.contiguous_energy_bins()
new_histos = []
for bin in bins:
# split the histogram into pieces so that we only rebin the piece
# that needs to be rebinned
pre = None
post = None
if (emax < erange[0]) or (emin > erange[1]):
histo = None
elif emax == erange[1]:
if emin > erange[0]:
pre = bin.slice_energy(erange[0], emin)
histo = bin.slice_energy(self.closest_energy_edge(emin,
which='high'),
erange[1])
elif emin == erange[0]:
histo = bin.slice_energy(erange[0],
self.closest_energy_edge(emax,
which='low'))
if emax < erange[1]:
post = bin.slice_energy(emax, erange[1])
elif (emin > erange[0]) and (emax < erange[1]):
pre = bin.slice_energy(erange[0], emin)
histo = bin.slice_energy(
self.closest_energy_edge(emin, which='high'),
self.closest_energy_edge(emax, which='low'))
post = bin.slice_energy(emax, erange[1])
else:
histo = bin
# perform the rebinning and create a new histo with the rebinned rates
if histo is not None:
edges = np.append(histo.emin, histo.emax[-1])
numtimes, numchans = histo.size
new_counts = []
for i in range(numtimes):
exposure = np.full(numchans, histo.exposure[i])
new_cts, _, new_edges = method(histo.counts[i, :],
exposure,
edges, *args)
new_counts.append(new_cts)
new_counts = np.array(new_counts)
new_histo = TimeEnergyBins(new_counts, bin.tstart, bin.tstop,
bin.exposure, new_edges[:-1],
new_edges[1:])
else:
new_histo = bin
# now merge the split histo back together again
histos_to_merge = [i for i in (pre, new_histo, post) if
i is not None]
new_histos.append(TimeEnergyBins.merge_energy(histos_to_merge))
new_histo = TimeEnergyBins.merge_energy(new_histos)
return new_histo
def get_exposure(self, time_ranges=None, scale=False):
"""Calculate the total exposure of a time range or time ranges of data
Args:
time_ranges ([(float, float), ...], optional):
The time range or time ranges over which to calculate the
exposure. If omitted, calculates the total exposure of the data
scale (bool, optional):
If True and the time ranges don't match up with the data binning,
will scale the exposure based on the requested time range.
Default is False.
Returns:
float: The exposure of the time selections
"""
if time_ranges is None:
time_ranges = [self.time_range]
try:
iter(time_ranges[0])
except:
time_ranges = [time_ranges]
exposure = 0.0
for i in range(len(time_ranges)):
mask = self._slice_time_mask(*self._assert_range(time_ranges[i]))
dt = (time_ranges[i][1] - time_ranges[i][0])
data_exp = np.sum(self.exposure[mask])
dts = np.sum(self.tstop[mask] - self.tstart[mask])
if dts > 0:
if scale:
exposure += data_exp * (dt / ds)
else:
exposure += data_exp
return exposure
@classmethod
def merge_time(cls, histos):
"""Merge multiple TimeEnergyBins together along the time axis.
Args:
histos (list of :class:`TimeEnergyBins`):
A list containing the TimeEnergyBins to be merged
Returns
:class:`TimeEnergyBins`: A new TimEnergyBins object containing the \
merged TimeEnergyBins
"""
num = len(histos)
# sort by start time
tstarts = np.array([histo.tstart[0] for histo in histos])
idx = np.argsort(tstarts)
# concatenate the histos in order
counts = histos[idx[0]].counts
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
emin = histos[idx[0]].emin
emax = histos[idx[0]].emax
for i in range(1, num):
bin_starts = histos[idx[i]].tstart
# make sure there is no overlap
mask = (bin_starts >= tstop[-1])
counts = np.vstack((counts, histos[idx[i]].counts[mask, :]))
tstart = np.concatenate((tstart, histos[idx[i]].tstart[mask]))
tstop = np.concatenate((tstop, histos[idx[i]].tstop[mask]))
exposure = np.concatenate(
(exposure, histos[idx[i]].exposure[mask]))
# new TimeEnergyBins object
merged_bins = cls(counts, tstart, tstop, exposure, emin, emax)
return merged_bins
@classmethod
def merge_energy(cls, histos):
"""Merge multiple TimeEnergyBins together along the energy axis.
Args:
histos (list of :class:`TimeEnergyBins`):
A list containing the TimeEnergyBins to be merged
Returns:
:class:`TimeEnergyBins`: A new TimEnergyBins object containing \
the merged TimeEnergyBins
"""
num = len(histos)
# sort by channel edge
emins = np.concatenate([histo.emin for histo in histos])
idx = np.argsort(emins)
# concatenate the histos in order
counts = histos[idx[0]].counts
tstart = histos[idx[0]].tstart
tstop = histos[idx[0]].tstop
exposure = histos[idx[0]].exposure
emin = histos[idx[0]].emin
emax = histos[idx[0]].emax
for i in range(1, num):
bin_starts = histos[idx[i]].emin
# make sure there is no overlap
mask = (bin_starts >= emax[-1])
counts = np.hstack((counts, histos[idx[i]].counts[:, mask]))
emin = np.concatenate((emin, histos[idx[i]].emin[mask]))
emax = np.concatenate((emax, histos[idx[i]].emax[mask]))
# new TimeEnergyBins object
merged_bins = cls(counts, tstart, tstop, exposure, emin, emax)
return merged_bins
class TimeRange():
"""A primitive class defining a time range
Parameters:
tstart (float): The start time of the range
tstop (float): The end time of the range
Attributes:
center (float): The center of the time range
duration (float): The duration of the time range
tstart (float): The start time of the range
tstop (float): The end time of the range
"""
def __init__(self, tstart, tstop):
tstart = self._assert_time(tstart)
tstop = self._assert_time(tstop)
if tstop >= tstart:
self._tstart = tstart
self._tstop = tstop
else:
self._tstart = tstop
self._tstop = tstart
def __str__(self):
return '({0}, {1})'.format(self.tstart, self.tstop)
def _assert_time(self, atime):
try:
atime = float(atime)
except:
raise TypeError('time must be a float')
return atime
@property
def tstart(self):
return self._tstart
@property
def tstop(self):
return self._tstop
@property
def duration(self):
return self.tstop - self.tstart
@property
def center(self):
return (self.tstart + self.tstop) / 2.0
def as_tuple(self):
"""Return the time range as a tuple.
Returns:
(float, float): The starting and ending time of the time range
"""
return (self.tstart, self.tstop)
def contains(self, a_time, inclusive=True):
"""Determine if the time range contains a time.
Args:
a_time (float): The input time to check
inclusive (bool, optional):
If True, then includes the edges of the range for the check,
otherwise it is edge-exclusive. Default is True.
Returns:
bool: True if the time is in the time range, False otherwise
"""
a_time = self._assert_time(a_time)
if inclusive:
test = (a_time <= self.tstop) and (a_time >= self.tstart)
else:
test = (a_time < self.tstop) and (a_time > self.tstart)
if test:
return True
else:
return False
@classmethod
def union(cls, range1, range2):
"""Return a new TimeRange that is the union of two input TimeRanges
Args:
range1 (:class:`TimeRange`): A time range used to calculate the union
range2 (:class:`TimeRange`): Another time range used to calculate
the union
Returns:
:class:`TimeRange`: The unionized time range
"""
tstart = np.min((range1.tstart, range2.tstart))
tstop = np.max((range1.tstop, range2.tstop))
obj = cls(tstart, tstop)
return obj
@classmethod
def intersection(cls, range1, range2):
"""Return a new TimeRange that is the intersection of two input
TimeRanges. If the input TimeRanges do not intersect, then None is
returned.
Args:
range1 (:class:`TimeRange`): A time range used to calculate the
intersection
range2 (:class:`TimeRange`): Another time range used to calculate
the intersection
Returns:
:class:`TimeRange`: The intersected time range
"""
# test if one tstart is inside the other time range
if range1.contains(range2.tstart):
lower = range1
upper = range2
elif range2.contains(range1.tstart):
lower = range2
upper = range1
else:
return None
# do the merge
tstart = upper.tstart
tstop = np.min((lower.tstop, upper.tstop))
obj = cls(tstart, tstop)
return obj
class GTI:
"""A primitive class defining a set of Good Time Intervals (GTIs)
Parameters:
tstart (float): The start time of the GTI
tstop (float): The end time of the GTI
Attributes:
num_intervals (int): The number of intervals in the GTI
range (float, float): The full range of the GTI
"""
def __init__(self, tstart=None, tstop=None):
self._gti = None
if (tstart is not None) and (tstop is not None):
self._gti = [TimeRange(tstart, tstop)]
@property
def num_intervals(self):
return len(self._gti)
@property
def range(self):
"""The full range of the GTI
:type: (float, float)"""
return (self._gti[0].tstart, self._gti[-1].tstop)
def as_list(self):
"""Return the GTI as a list of tuples.
Returns:
[(float, float), ...]: The list of GTI interval tuples
"""
gti_list = [one_gti.as_tuple() for one_gti in self._gti]
return gti_list
def insert(self, tstart, tstop):
"""Insert a new interval into the GTI
Args:
tstart (float): The start time of the new interval
tstop (float): The end time of the new interval
"""
# where the new time range should be inserted
time_range = TimeRange(tstart, tstop)
idx = [i for i, j in enumerate(self._gti) if
j.tstart <= time_range.tstart]
# determine if there is overlap with the lower bounding range, and if so
# then merge
if len(idx) != 0:
idx = idx[-1]
if self._gti[idx].contains(time_range.tstart, inclusive=True):
the_range = self._gti.pop(idx)
time_range = TimeRange.union(the_range, time_range)
else:
idx += 1
else:
idx = 0
# determine if there is overlap with the upper bounding range, and if so
# then merge
if idx < len(self._gti):
if self._gti[idx].contains(time_range.tstop):
the_range = self._gti.pop(idx)
time_range = TimeRange.union(the_range, time_range)
self._gti.insert(idx, time_range)
def contains(self, a_time, inclusive=True):
"""Determine if the GTI contains a time.
Args:
a_time (float): The input time to check
inclusive (bool, optional):
If True, then includes the edges of the range for the check,
otherwise it is edge-exclusive. Default is True.
Returns:
bool: True if the time is in the GTI, False otherwise
"""
test = [gti.contains(a_time, inclusive=inclusive) for gti in self._gti]
return any(test)
@classmethod
def from_list(cls, gti_list):
"""Create a new GTI object from a list of tuples.
Args:
gti_list ([(float, float), ...]): A list of interval tuples
Returns:
:class:`GTI`: The new GTI object
"""
gtis = [TimeRange(*one_gti) for one_gti in gti_list]
obj = cls()
obj._gti = gtis
return obj
@classmethod
def from_boolean_mask(cls, times, mask):
"""Create a new GTI object from a list of times and a Boolean mask
Splits the boolean mask into segments of contiguous values and applies
to array of times to create a GTI object.
Args:
times (np.array): An array of times
mask (np.array(dtype=bool)): The boolean array. Must be the same
size as times.
Returns:
:class:`GTI`: The new GTI object
"""
# split a boolean mask array into segments based on True/False
indices = np.nonzero(mask[1:] != mask[:-1])[0] + 1
time_segs = np.split(times, indices)
mask_segs = np.split(mask, indices)
# retrieve the start and stop times for the "off" intervals
segs = []
numsegs = len(indices) + 1
for i in range(numsegs):
if mask_segs[i][0] == 0:
segs.append((time_segs[i][0], time_segs[i][-1]))
# mark FIXME: null if mask is all True or all False
# currently assuming that it must be all True
if len(segs) == 0:
segs = [self.time_range]
return cls.from_list(segs)
@classmethod
def merge(cls, gti1, gti2):
"""Return a new GTI object that is a merge of two existing GTI objects.
Args:
gti1 (:class:`GTI`): A GTI to be merged
gti2 (:class:`GTI`): A GTI to be merged
Returns:
:class:`GTI`: The new merged GTI object
"""
time_list = gti1.as_list()
time_list.extend(gti2.as_list())
time_list = sorted(time_list)
obj = cls(*time_list.pop(0))
[obj.insert(*interval) for interval in time_list]
return obj