359 lines
10 KiB
Python
359 lines
10 KiB
Python
|
# file.py: Module containing GBM filenaming convention and operations
|
||
|
#
|
||
|
# Authors: William Cleveland (USRA),
|
||
|
# Adam Goldstein (USRA) and
|
||
|
# Daniel Kocevski (NASA)
|
||
|
#
|
||
|
# Portions of the code are Copyright 2020 William Cleveland and
|
||
|
# Adam Goldstein, Universities Space Research Association
|
||
|
# All rights reserved.
|
||
|
#
|
||
|
# Written for the Fermi Gamma-ray Burst Monitor (Fermi-GBM)
|
||
|
#
|
||
|
# This program is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU General Public License
|
||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
#
|
||
|
|
||
|
import copy
|
||
|
import datetime
|
||
|
import os.path
|
||
|
import re
|
||
|
|
||
|
from .detectors import Detector
|
||
|
from .time import Met
|
||
|
|
||
|
|
||
|
class GbmFile:
|
||
|
"""Parse or construct a GBM standardized filename.
|
||
|
|
||
|
Attributes:
|
||
|
data_type (str): The datatype of the file
|
||
|
detector (str): The detector with which the file is associated
|
||
|
directory (str): The directory hosting the file
|
||
|
extension (str): The filename extension
|
||
|
meta (str): Additional metadata in the filename
|
||
|
trigger (bool): True if the file is from a trigger. False otherwise
|
||
|
uid (str): The unique id of the file
|
||
|
version (int): The version number of the file
|
||
|
"""
|
||
|
REGEX_PATTERN = r'^glg_(?P<data_type>.+)_(?P<detector>[bn][0-9ab]|all)_(?P<trigger>(?:bn)?)(?P<uid>(?:\d{9}|\d{6}' \
|
||
|
r'_\d\dz|\d{6}))(?P<meta>(?:_.+)?)_v(?P<version>\d\d)\.(?P<extension>.+)$'
|
||
|
|
||
|
def __init__(self):
|
||
|
self.directory = ''
|
||
|
self.trigger = False
|
||
|
self.data_type = None
|
||
|
self._detector = None
|
||
|
self.uid = None
|
||
|
self.meta = None
|
||
|
self.version = 0
|
||
|
self.extension = 'fit'
|
||
|
|
||
|
def _init_by_dict(self, values):
|
||
|
for key, val in values.items():
|
||
|
# handle properties differently
|
||
|
try:
|
||
|
p = getattr(self, key)
|
||
|
if isinstance(p, property):
|
||
|
p.__set__(self, val)
|
||
|
else:
|
||
|
self.__setattr__(key, val)
|
||
|
except AttributeError:
|
||
|
raise ValueError("{} is not a valid attribute.".format(key))
|
||
|
|
||
|
@property
|
||
|
def detector(self):
|
||
|
if not self._detector:
|
||
|
return 'all'
|
||
|
return self._detector
|
||
|
|
||
|
@detector.setter
|
||
|
def detector(self, value):
|
||
|
if value == 'all':
|
||
|
self._detector = None
|
||
|
elif isinstance(value, Detector):
|
||
|
self._detector = value
|
||
|
else:
|
||
|
if isinstance(value, str):
|
||
|
d = Detector.from_str(value)
|
||
|
self._detector = d if d else value
|
||
|
elif isinstance(value, int):
|
||
|
d = Detector.from_num(value)
|
||
|
if d:
|
||
|
self._detector = d
|
||
|
else:
|
||
|
raise ValueError("Invalid detector value")
|
||
|
|
||
|
def version_str(self):
|
||
|
"""Return the file version number as a string
|
||
|
|
||
|
Returns:
|
||
|
str: The file version
|
||
|
"""
|
||
|
if isinstance(self.version, int):
|
||
|
v = "{:02d}".format(self.version)
|
||
|
else:
|
||
|
v = self.version
|
||
|
return v
|
||
|
|
||
|
def basename(self):
|
||
|
"""The file basename
|
||
|
|
||
|
Returns:
|
||
|
str: The file basename
|
||
|
"""
|
||
|
if self.trigger:
|
||
|
u = 'bn' + self.uid
|
||
|
else:
|
||
|
u = self.uid
|
||
|
|
||
|
if self.meta:
|
||
|
return str.format("glg_{}_{}_{}{}_v{}.{}",
|
||
|
self.data_type, self.detector, u, self.meta,
|
||
|
self.version_str(), self.extension)
|
||
|
|
||
|
return str.format("glg_{}_{}_{}_v{}.{}",
|
||
|
self.data_type, self.detector, u, self.version_str(),
|
||
|
self.extension)
|
||
|
|
||
|
def path(self):
|
||
|
"""The file path
|
||
|
|
||
|
Returns:
|
||
|
str: The path
|
||
|
"""
|
||
|
return os.path.join(self.directory, self.basename())
|
||
|
|
||
|
def __str__(self):
|
||
|
return self.path()
|
||
|
|
||
|
def __repr__(self):
|
||
|
return self.path()
|
||
|
|
||
|
@classmethod
|
||
|
def create(cls, **kwargs):
|
||
|
"""Create a GbmFile from keywords
|
||
|
|
||
|
Args:
|
||
|
**kwargs: The properties of a GbmFile
|
||
|
|
||
|
Returns:
|
||
|
:class:`GbmFile`: The new filename object
|
||
|
"""
|
||
|
obj = cls()
|
||
|
obj._init_by_dict(kwargs)
|
||
|
return obj
|
||
|
|
||
|
@classmethod
|
||
|
def from_path(cls, path):
|
||
|
"""Create a GbmFile from parsing a filename
|
||
|
|
||
|
Args:
|
||
|
path (str): A filename path
|
||
|
|
||
|
Returns:
|
||
|
:class:`GbmFile`: The new filename object
|
||
|
"""
|
||
|
m = re.match(cls.REGEX_PATTERN, os.path.basename(path), re.I | re.S)
|
||
|
|
||
|
result = None
|
||
|
if m:
|
||
|
result = cls.create(**m.groupdict())
|
||
|
result.directory = os.path.dirname(path)
|
||
|
|
||
|
return result
|
||
|
|
||
|
def detector_list(self):
|
||
|
"""Generate a list of GbmFile objects, one for each GBM detector
|
||
|
|
||
|
Returns:
|
||
|
list of :class:`GbmFile`: The new filename objects
|
||
|
"""
|
||
|
result = []
|
||
|
for d in Detector:
|
||
|
x = copy.copy(self)
|
||
|
x.detector = d
|
||
|
result.append(x)
|
||
|
return result
|
||
|
|
||
|
@classmethod
|
||
|
def list_from_paths(cls, path_list, unknown=None):
|
||
|
"""Create a many GbmFiles from a list of filepaths
|
||
|
|
||
|
Args:
|
||
|
path_list (list of str): List of filepaths
|
||
|
|
||
|
Returns:
|
||
|
list of :class:`GbmFile`: The new filename object(s)
|
||
|
"""
|
||
|
result = []
|
||
|
for p in path_list:
|
||
|
f = GbmFile.from_path(p)
|
||
|
if f:
|
||
|
result.append(f)
|
||
|
else:
|
||
|
if unknown is not None:
|
||
|
unknown.append(p)
|
||
|
else:
|
||
|
raise ValueError('Unrecognized file name')
|
||
|
return result
|
||
|
|
||
|
|
||
|
def scan_dir(path, hidden=False, recursive=False, absolute=False, regex=None):
|
||
|
"""
|
||
|
Scans the given directory for files.
|
||
|
|
||
|
Args:
|
||
|
path (str): The root directory to scan.
|
||
|
hidden (bool, optional): Set True if you want to include hidden files.
|
||
|
recursive (bool, optional): Set True if you want to scan subdirectories
|
||
|
within the given path.
|
||
|
absolute (bool, optional): Set true if you want the absolute path of
|
||
|
each file returned.
|
||
|
regex (str): Set if you want to only return files matching the given
|
||
|
regular expression.
|
||
|
|
||
|
Yields:
|
||
|
str: Full path to a file for each iteration.
|
||
|
"""
|
||
|
for f in os.listdir(path):
|
||
|
if not hidden:
|
||
|
if f.startswith('.'):
|
||
|
continue
|
||
|
file_path = os.path.join(path, f)
|
||
|
if absolute:
|
||
|
file_path = os.path.abspath(file_path)
|
||
|
if os.path.isfile(file_path):
|
||
|
if regex and re.search(regex, f) is None:
|
||
|
continue
|
||
|
yield file_path
|
||
|
elif recursive:
|
||
|
yield from scan_dir(file_path, hidden, recursive, absolute, regex)
|
||
|
|
||
|
|
||
|
def all_exists(file_list, parent_dir=None):
|
||
|
"""
|
||
|
Do all the files in the list exist in the filesystem?
|
||
|
|
||
|
Args:
|
||
|
file_list (list of str): List of file names to check
|
||
|
parent_dir (str, optional): parent directory
|
||
|
|
||
|
Returns:
|
||
|
bool: True if all files exist
|
||
|
"""
|
||
|
if not file_list:
|
||
|
return False
|
||
|
for f in file_list:
|
||
|
if parent_dir is not None:
|
||
|
path = os.path.join(parent_dir, f.basename())
|
||
|
else:
|
||
|
path = str(f)
|
||
|
if not os.path.exists(path):
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
|
||
|
def has_detector(file_list, detector):
|
||
|
"""
|
||
|
Does the file list contain a file for the given detector?
|
||
|
|
||
|
Args:
|
||
|
file_list (list of str): List of file names
|
||
|
detector (str): Detector being searched
|
||
|
|
||
|
Returns:
|
||
|
bool: True if the list of file names includes the given detector
|
||
|
"""
|
||
|
for f in file_list:
|
||
|
if f.detector == detector:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
|
||
|
def is_complete(file_list):
|
||
|
"""
|
||
|
Does the file list contain a file for every detector?
|
||
|
|
||
|
Args:
|
||
|
file_list (list of str): List of files that represent a detector set
|
||
|
|
||
|
Returns:
|
||
|
bool: True if the file list contains a file for every detector
|
||
|
"""
|
||
|
for d in Detector:
|
||
|
if not has_detector(file_list, d):
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
|
||
|
def max_version(file_list):
|
||
|
"""
|
||
|
Returns the maximum _version of file name in the given list
|
||
|
|
||
|
Args:
|
||
|
file_list (list of str): list of file names
|
||
|
|
||
|
Returns:
|
||
|
int: Largest _version number in the list
|
||
|
"""
|
||
|
result = None
|
||
|
for f in file_list:
|
||
|
try:
|
||
|
v = int(f.version)
|
||
|
if result is None or v > result:
|
||
|
result = v
|
||
|
except ValueError:
|
||
|
pass
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
def min_version(file_list):
|
||
|
"""
|
||
|
Returns the minimum _version of file name in the given list
|
||
|
|
||
|
Args:
|
||
|
file_list (list of str): list of file names
|
||
|
|
||
|
Returns:
|
||
|
int: Smallest _version number in the list
|
||
|
"""
|
||
|
result = None
|
||
|
for f in file_list:
|
||
|
try:
|
||
|
v = int(f.version)
|
||
|
if result is None or v < result:
|
||
|
result = v
|
||
|
except ValueError:
|
||
|
pass
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
def ymd_path(base, name):
|
||
|
if isinstance(name, str) or isinstance(name, GbmFile):
|
||
|
v = name if isinstance(name, str) else name.basename()
|
||
|
m = re.match(r'.*_(?:(?:bn)?)(\d{6})(?:(\d{3})|(_\d\d)?)_.*', v,
|
||
|
re.I | re.S)
|
||
|
if m:
|
||
|
d = datetime.datetime.strptime(m.group(1), "%y%m%d")
|
||
|
return os.path.join(base, d.strftime('%Y-%m-%d'),
|
||
|
os.path.basename(name))
|
||
|
elif isinstance(name, Met):
|
||
|
return os.path.join(base, name.datetime.strftime('%Y-%m-%d'))
|
||
|
elif isinstance(name, datetime.datetime) or isinstance(name,
|
||
|
datetime.date):
|
||
|
return os.path.join(base, name.strftime('%Y-%m-%d'))
|
||
|
raise ValueError("Can't parse a YMD value")
|