This repository has been archived on 2024-11-23. You can view files and clone it, but cannot push or open issues or pull requests.
GBM-data-tools/lookup/lookup.py
2022-07-15 07:36:07 +00:00

698 lines
23 KiB
Python

# lookup.py: GSpec and RMfit lookup classes
#
# Authors: William Cleveland (USRA),
# Adam Goldstein (USRA) and
# Daniel Kocevski (NASA)
#
# Portions of the code are Copyright 2020 William Cleveland and
# Adam Goldstein, Universities Space Research Association
# All rights reserved.
#
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import datetime as dt
import json
import os.path
import warnings
import numpy as np
from gbm.detectors import Detector
from gbm.file import GbmFile
from gbm.types import ListReader
class LookupMethod:
"""Defines the attributes of a method call"""
def __init__(self):
self.method = None
self.args = None
self.kwargs = {}
@classmethod
def from_dict(cls, d):
r = cls()
r.method = d.get('method', None)
r.args = tuple(d.get('args', None))
r.kwargs = d.get('kwargs', {})
return r
class LookupBackground(LookupMethod):
"""Defines the attributes of a background binning method"""
def __init__(self):
super(LookupBackground, self).__init__()
self.datatype = None
@classmethod
def from_dict(cls, d):
r = super(LookupBackground, cls).from_dict(d)
r.datatype = d.get('datatype', None)
return r
class LookupEnergyBinning(LookupMethod):
"""Defines the attributes of an energy binning method"""
def __init__(self):
super(LookupEnergyBinning, self).__init__()
self.start = None
self.stop = None
@classmethod
def from_dict(cls, d):
r = super(LookupEnergyBinning, cls).from_dict(d)
r.start = d.get('start', None)
r.stop = d.get('stop', None)
return r
class LookupTimeBinning(LookupEnergyBinning):
"""Defines the attributes of a time binning method"""
def __init__(self):
super(LookupTimeBinning, self).__init__()
self.datatype = None
@classmethod
def from_dict(cls, d):
r = super(LookupTimeBinning, cls).from_dict(d)
r.datatype = d.get('datatype', None)
return r
class View:
"""Defines the bounds of a view"""
def __init__(self, xmin=None, xmax=None, ymin=None, ymax=None):
self.xmin = xmin
self.xmax = xmax
self.ymin = ymin
self.ymax = ymax
def __eq__(self, other):
return self.xmin == other.xmin and self.xmax == other.xmax and self.ymin == other.ymin \
and self.ymax == other.ymax
@classmethod
def from_dict(cls, d):
r = cls()
if d:
r.xmin = d.get('xmin', None)
r.xmax = d.get('xmax', None)
r.ymin = d.get('ymin', None)
r.ymax = d.get('ymax', None)
return r
@classmethod
def from_list(cls, l):
r = cls()
r.xmin = l[0]
r.xmax = l[1]
r.ymin = l[2]
r.ymax = l[3]
return r
def to_list(self):
return [self.xmin, self.xmax, self.ymin, self.ymax]
def xrange(self):
return self.xmin, self.xmax
def yrange(self):
return self.ymin, self.ymax
class Binnings:
def __init__(self):
self.energy = None
self.time = None
class Selections:
def __init__(self):
self.background = None
self.energy = None
self.source = None
def add(self, type, item):
if getattr(self, type) is None:
setattr(self, type, list())
getattr(self, type).append(item)
class Views:
def __init__(self):
self.energy = None
self.time = None
class DataFileLookup:
"""Defines all the information associated with a datafile"""
def __init__(self):
self.filename = None
self.detector = None
self.response = None
self.background = None
self.binnings = Binnings()
self.selections = Selections()
self.views = Views()
@classmethod
def from_dict(cls, d):
def set_attributes(obj, d):
for k, v in d.items():
setattr(obj, k, v)
r = cls()
r.filename = d.get('filename', None)
det = d.get('detector', None)
if det:
r.detector = Detector.from_str(det)
r.response = d.get('response', None)
bkg = d.get('background', None)
if bkg:
r.background = LookupBackground.from_dict(bkg)
binnings = d.get('binnings', None)
if binnings:
energies = binnings.get('energy', None)
if energies:
for e in energies:
if r.binnings.energy is None:
r.binnings.energy = [LookupEnergyBinning.from_dict(e)]
else:
r.binnings.energy.append(
LookupEnergyBinning.from_dict(e))
times = binnings.get('time', None)
if times:
for t in times:
if r.binnings.time is None:
r.binnings.time = [LookupTimeBinning.from_dict(t)]
else:
r.binnings.time.append(LookupTimeBinning.from_dict(t))
if 'selections' in d:
set_attributes(r.selections, d['selections'])
views = d.get('views', None)
if views:
e = views.get('energy', None)
if e:
r.views.energy = View.from_dict(e)
t = views.get('time', None)
if t:
r.views.time = View.from_dict(t)
return r
@staticmethod
def assert_selections(selections):
"""Check to ensure the selections are of the correct form.
Parameters:
-----------
selections: tuple or list of tuples
The selection(s) to check
Returns:
--------
selections: list
"""
if (all(isinstance(selection, list) for selection in selections)) | \
(
all(isinstance(selection, tuple) for selection in selections)):
if any(len(selection) != 2 for selection in selections):
raise ValueError('Each range in selections must be of the '
'form (lo, hi)')
else:
return selections
else:
if len(selections) != 2:
raise ValueError('Selections must either be a range of '
'the form (lo, hi) or a list of ranges')
else:
return [selections]
def set_response(self, rsp_filename):
"""Add a response file for the data
Parameters:
--------------
rsp_filename: str
The filename of the response file
"""
if rsp_filename is None:
self.response = None
else:
self.response = os.path.basename(rsp_filename)
def set_background_model(self, background_name, datatype, *args, **kwargs):
"""Add a new background model for the data file
Parameters:
--------------
background_class: str
The background fitting/estimation name
datatype: str
The datatype the background is applied to. Either 'binned' or 'unbinned'
*args:
Additional arguments used by the background class
**kwargs:
Additional keywords used by the background class
"""
bkg = LookupBackground()
bkg.method = background_name
bkg.datatype = datatype
bkg.args = args
bkg.kwargs = kwargs
self.background = bkg
def set_time_binning(self, binning_name, datatype, *args, start=None,
stop=None, **kwargs):
"""Add a new time binning function for the data file
Parameters:
--------------
binning_function: str
The binning function name
datatype: str
The datatype the binning is applied to. Either 'binned' or 'unbinned'
*args:
Additional arguments used by the binning function
start: float, optional
The start of the data range to be rebinned. The default is to start at the
beginning of the histogram segment.
stop: float, optional
The end of the data range to be rebinned. The default is to stop at
the end of the histogram segment.
**kwargs:
Additional keywords used by the binning function
"""
time_bin = LookupTimeBinning()
time_bin.method = binning_name
time_bin.datatype = datatype
time_bin.args = args
time_bin.start = start
time_bin.stop = stop
time_bin.kwargs = kwargs
if self.binnings.time is None:
self.binnings.time = [time_bin]
else:
self.binnings.time.append(time_bin)
def set_energy_binning(self, binning_function, *args, start=None,
stop=None, **kwargs):
"""Add a new energy binning function for the data file
Parameters:
--------------
binning_function: function
The binning function
*args:
Additional arguments used by the binning function
start: float, optional
The start of the data range to be rebinned. The default is to start at the
beginning of the histogram segment.
stop: float, optional
The end of the data range to be rebinned. The default is to stop at
the end of the histogram segment.
**kwargs:
Additional keywords used by the binning function
"""
energy_bin = LookupEnergyBinning()
energy_bin.method = binning_function
energy_bin.args = args
energy_bin.start = start
energy_bin.stop = stop
energy_bin.kwargs = kwargs
if self.binnings.energy is None:
self.binnings.energy = [energy_bin]
else:
self.binnings.energy.append(energy_bin)
def set_source_selection(self, source_intervals):
"""Add source selection(s) for the data file
Parameters:
--------------
dataname: str
The data filename
source_intervals: list
A list of source selection intervals, each item of the list being a tuple
of the format (low, high)
"""
source_intervals = self.assert_selections(source_intervals)
self.selections.source = source_intervals
def set_energy_selection(self, energy_intervals):
"""Add energy selection(s) for the data file
Parameters:
--------------
energy_intervals: list
A list of energy selection intervals, each item of the list being a tuple
of the format (low, high)
"""
energy_intervals = self.assert_selections(energy_intervals)
self.selections.energy = energy_intervals
def set_background_selection(self, background_intervals):
"""Add background selection(s) for the data file
Parameters:
--------------
background_intervals: list
A list of background selection intervals, each item of the list being a tuple
of the format (low, high)
"""
self.selections.background = background_intervals
def add_time_display_view(self, display_range):
"""Add the display range of the lightcurve for the data file
Parameters:
--------------
display_range: list
The values of the lightcurve display window in the format
[xmin, xmax, ymin, ymax]
"""
self.views.time = View(display_range[0], display_range[1],
display_range[2], display_range[3])
def add_energy_display_view(self, display_range):
"""Add the display range of the count spectrum for the data file
Parameters:
--------------
dataname: str
The data filename
display_range: list
The values of the count spectrum display window in the format
[xmin, xmax, ymin, ymax]
"""
self.views.energy = View(display_range[0], display_range[1],
display_range[2], display_range[3])
class LookupFile:
"""Class for an Gspec lookup file
The lookup file contains one or more data files.
"""
def __init__(self, *args, **kwargs):
self.file_date = None
self.datafiles = dict()
def __getitem__(self, name):
return self.datafiles[name]
def __delitem__(self, key):
del self.datafiles[key]
def __setitem__(self, key, value):
if isinstance(value, DataFileLookup):
self.datafiles[key] = value
else:
raise ValueError("not a DataFile")
def files(self):
"""Return the data filenames contained within the lookup"""
return self.datafiles.keys()
def assert_has_datafile(self, dataname):
"""Check to see if the data file has been added to the lookup
Parameters:
--------------
dataname: str
The data file name
"""
if dataname not in self.datafiles.keys():
raise KeyError('File {0} not currently tracked. Add this file to '
'the lookup and try again.'.format(dataname))
def add_data_file(self, filepath):
df = DataFileLookup()
fn = GbmFile.from_path(filepath)
df.filename = fn.basename()
df.detector = fn.detector
self.datafiles[df.filename] = df
@classmethod
def from_dict(cls, d):
r = cls()
r.file_date = d.get('file_date', None)
datafiles = d.get('datafiles', None)
if datafiles:
for k, v in datafiles.items():
df = DataFileLookup.from_dict(v)
df.filename = k
r.datafiles[k] = df
return r
def write_to(self, fpath):
"""
Write contents of LookupFile to the given file path as a JSON file.
:param fpath: full pathname for JSON file
:return: None
"""
self.file_date = dt.datetime.utcnow().isoformat()
with open(fpath, "w") as fp:
json.dump(self, fp, cls=LookupEncoder, indent=4)
@classmethod
def read_from(cls, fpath):
"""
Load values to LookupFile from the JSON file at the given path.
:param fpath: full pathname for JSON file
:return: new LookupFile object
"""
with open(fpath, "r") as fp:
j = json.load(fp)
return cls.from_dict(j)
@classmethod
def read_from_rmfit(cls, fpath, ti_file=None, dataname=None):
"""
Load values to LookupFile from the RMFIT created lookup file at the given path.
:param fpath: full pathname for RMFIT created lookup file
:param ti_file: full pathname for RMFIT created ti file.
:param dataname: the name of the datafile to associate this lookup file with
:return: new LookupFile object
"""
# RMFit selections are an 2xN array where the first element is the start values and the second element
# are the end values. It needs to be transposed into a Nx2 array. Drop first is used to drop the convex
# hull if the selections contain one.
def transform_selections(x, drop_first=False):
result = None
if x:
result = np.array(x).reshape(2, -1).transpose().tolist()
if drop_first:
result = result[1:]
return result
# Begin loading RMFit lookup file making the contents a list of tokens.
tokens = []
with open(fpath, 'r') as contents:
for line in contents:
x = line.strip().split()
if x:
try:
# If the first element a number? Then add the array to the tokens.
float(x[0])
tokens += x
except ValueError:
# Otherwise, it's a string and we will append the entire line as a token.
tokens.append(line)
# Let's create the DataFile object
data_file = DataFileLookup()
# The input data file is based on the lookup filename
f = GbmFile.from_path(fpath)
if f.extension == 'lu':
if f.data_type == 'ctime' or f.data_type == 'cspec':
f.extension = 'pha'
elif f.data_type == 'tte':
f.extension = 'fit'
else:
raise ValueError('Not a valid lookup filename')
else:
raise ValueError("Not a valid lookup filename")
if dataname:
data_file.filename = os.path.basename(dataname)
else:
data_file.filename = f.basename()
lr = ListReader(tokens)
# energy edges, if None is returned we need to read the next value anyway which should be zero.
energy_edges = lr.get_n(int, rmfit=True)
if energy_edges:
# TODO: add_energy_binning unresolved for class 'DataFileLookup'
data_file.add_energy_binning('By Edge Index',
np.array(energy_edges))
# energy selections, if None is returned we need to read the next value anyway which should be zero.
data_file.selections.energy = transform_selections(
lr.get_n(float, rmfit=True), drop_first=True)
# rebinned time edges, if None is returned we need to read the next value anyway which should be zero.
time_edges = lr.get_n(int, rmfit=True)
if time_edges:
if f.data_type == 'ctime' or f.data_type == 'cspec':
data_file.set_time_binning('By Edge Index', 'binned',
np.array(time_edges))
elif f.data_type == 'tte':
# Read TI file
if ti_file:
with open(ti_file, 'r') as fp:
txt = list(fp)
txt = txt[1:]
tte_edges = np.array([t.strip() for t in txt], dtype=float)
data_file.set_time_binning('By Time Edge', 'unbinned',
np.array(tte_edges))
else:
warnings.warn("No TTE edges found. Need '.ti' file")
# time selections, if None is returned we need to read the next value anyway which should be zero.
data_file.selections.source = transform_selections(
lr.get_n(float, rmfit=True), drop_first=True)
# background selections, if None is returned we need to read the next value anyway which should be zero.
data_file.selections.background = transform_selections(
lr.get_n(float, rmfit=True))
# TODO: For now skip over binning names
lr.skip(3) # Assuming 'STACKED SPECTRA', 'LOG', 'LOG'
# time and energy window ranges: (xmin, xmax, ymin, ymax)
v = lr.get(4, float)
data_file.views.time = View(v[0], v[1], v[2], v[3])
v = lr.get(4, float)
data_file.views.energy = View(v[0], v[1], v[2], v[3])
# polynomial background order
# data_file.background = {'poly_order': lr.get(cls=int)}
poly_order = lr.get(cls=int)
data_file.set_background_model('Polynomial', 'binned', poly_order)
# Add the data file to a newly created lookup file
lu = cls()
lu.datafiles[data_file.filename] = data_file
return lu
def merge_lookup(self, lookup, overwrite=False):
"""Merge an existing lookup into this lookup
Parameters:
--------------
lookup: GspecLookup
The lookup object to be merged into this lookup
overwrite: bool, optional
If set to True, then any datanames in the current lookup will be overwritten
if those same datanames are in the input lookup. Default is False
"""
# get datanames of the input lookup
datanames = lookup.datafiles.keys()
for dataname in datanames:
# if dataname is already in this lookup and we don't want to overwrite
if (dataname in self.datafiles) & (not overwrite):
continue
self.datafiles[dataname] = lookup.datafiles[dataname]
# TODO: Remove?
def split_off_dataname(self, dataname):
"""Return a new lookup object containing only the requested data file
Parameters:
--------------
dataname: str
The requested data filename
Returns:
-----------
new_lookup: GspecLookup
The new lookup object
"""
self.assert_has_datafile(dataname)
new_lookup = LookupFile()
new_lookup[dataname] = self.datafiles[dataname]
return new_lookup
def display_lookup(self):
"""Pretty print a lookup for display (in json format)
"""
lu = json.dumps(self.datafiles, indent=4, separators=(',', ': '),
cls=LookupEncoder)
return lu
class LookupEncoder(json.JSONEncoder):
"""Custom JSON encoder for numpy arrays. Converts them to a list.
"""
def default(self, obj):
if isinstance(obj, DataFileLookup):
d = dict(obj.__dict__)
del d['filename']
return d
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, Detector):
return obj.short_name
elif hasattr(obj, 'to_dict'):
return obj.to_dict()
elif hasattr(obj, '__dict__'):
return obj.__dict__
return json.JSONEncoder.default(self, obj)
class LookupDecoder(json.JSONDecoder):
"""Custom JSON decoder to turn JSON lists into numpy arrays
"""
def __init__(self, *args, **kwargs):
json.JSONDecoder.__init__(self, object_hook=self.object_hook,
*args, **kwargs)
def object_hook(self, obj):
# if object is a dictionary
if type(obj) == dict:
for key in obj.keys():
# and if the value is a list, change to numpy array
obj_type = type(obj[key])
if obj_type == list:
obj[key] = np.array(obj[key], dtype=type(obj[key]))
return obj