Source code for hermes_core.util.schema

"""
This module provides schema metadata derivations.

This code is based on that provided by SpacePy see
    licenses/SPACEPY.rst
"""

from pathlib import Path
from collections import OrderedDict
from copy import deepcopy
from typing import Optional
import math
import yaml
import numpy as np
from astropy.table import Table
from astropy.time import Time
from astropy import units as u
from ndcube import NDCube
import hermes_core
from hermes_core import log
from hermes_core.util import util, const
from hermes_core.util.exceptions import warn_user

__all__ = ["HermesDataSchema"]

DEFAULT_GLOBAL_CDF_ATTRS_SCHEMA_FILE = "hermes_default_global_cdf_attrs_schema.yaml"
DEFAULT_GLOBAL_CDF_ATTRS_FILE = "hermes_default_global_cdf_attrs.yaml"
DEFAULT_VARIABLE_CDF_ATTRS_SCHEMA_FILE = "hermes_default_variable_cdf_attrs_schema.yaml"


[docs] class HermesDataSchema: """Class representing the schema of a file type.""" def __init__(self): super().__init__() # Data Validation, Complaiance, Derived Attributes self._global_attr_schema = HermesDataSchema._load_default_global_attr_schema() # Data Validation and Compliance for Variable Data self._variable_attr_schema = ( HermesDataSchema._load_default_variable_attr_schema() ) # Load Default Global Attributes self._default_global_attributes = HermesDataSchema._load_default_attributes() self.cdftypenames = { const.CDF_BYTE.value: "CDF_BYTE", const.CDF_CHAR.value: "CDF_CHAR", const.CDF_INT1.value: "CDF_INT1", const.CDF_UCHAR.value: "CDF_UCHAR", const.CDF_UINT1.value: "CDF_UINT1", const.CDF_INT2.value: "CDF_INT2", const.CDF_UINT2.value: "CDF_UINT2", const.CDF_INT4.value: "CDF_INT4", const.CDF_UINT4.value: "CDF_UINT4", const.CDF_INT8.value: "CDF_INT8", const.CDF_FLOAT.value: "CDF_FLOAT", const.CDF_REAL4.value: "CDF_REAL4", const.CDF_DOUBLE.value: "CDF_DOUBLE", const.CDF_REAL8.value: "CDF_REAL8", const.CDF_EPOCH.value: "CDF_EPOCH", const.CDF_EPOCH16.value: "CDF_EPOCH16", const.CDF_TIME_TT2000.value: "CDF_TIME_TT2000", } self.numpytypedict = { const.CDF_BYTE.value: np.int8, const.CDF_CHAR.value: np.int8, const.CDF_INT1.value: np.int8, const.CDF_UCHAR.value: np.uint8, const.CDF_UINT1.value: np.uint8, const.CDF_INT2.value: np.int16, const.CDF_UINT2.value: np.uint16, const.CDF_INT4.value: np.int32, const.CDF_UINT4.value: np.uint32, const.CDF_INT8.value: np.int64, const.CDF_FLOAT.value: np.float32, const.CDF_REAL4.value: np.float32, const.CDF_DOUBLE.value: np.float64, const.CDF_REAL8.value: np.float64, const.CDF_EPOCH.value: np.float64, const.CDF_EPOCH16.value: np.dtype((np.float64, 2)), const.CDF_TIME_TT2000.value: np.int64, } self.timetypes = [ const.CDF_EPOCH.value, const.CDF_EPOCH16.value, const.CDF_TIME_TT2000.value, ] # List of Tuple of (WCS Keyword, Astropy Property, Default Value) # There is one entry for each keyword/property along each dimension of # the spectra scored in the astropy.wcs.WCS object self.wcs_keyword_to_astropy_property = [ ("CNAME", "cname", "NoName"), ("CTYPE", "ctype", "TEST"), ("CUNIT", "cunit", u.dimensionless_unscaled.to_string()), ("CRPIX", "crpix", 0), ("CRVAL", "crval", 1), ("CDELT", "cdelt", 1), ] @property def global_attribute_schema(self): """(`dict`) Schema for variable attributes of the file.""" return self._global_attr_schema @property def variable_attribute_schema(self): """(`dict`) Schema for variable attributes of the file.""" return self._variable_attr_schema @property def default_global_attributes(self): """(`dict`) Default Global Attributes applied for all HERMES Data Files""" return self._default_global_attributes @staticmethod def _load_default_global_attr_schema() -> dict: # The Default Schema file is contained in the `hermes_core/data` directory default_schema_path = str( Path(hermes_core.__file__).parent / "data" / DEFAULT_GLOBAL_CDF_ATTRS_SCHEMA_FILE ) # Load the Schema return HermesDataSchema._load_yaml_data(yaml_file_path=default_schema_path) @staticmethod def _load_default_variable_attr_schema() -> dict: # The Default Schema file is contained in the `hermes_core/data` directory default_schema_path = str( Path(hermes_core.__file__).parent / "data" / DEFAULT_VARIABLE_CDF_ATTRS_SCHEMA_FILE ) # Load the Schema return HermesDataSchema._load_yaml_data(yaml_file_path=default_schema_path) @staticmethod def _load_default_attributes() -> dict: # The Default Attributes file is contained in the `hermes_core/data` directory default_attributes_path = str( Path(hermes_core.__file__).parent / "data" / DEFAULT_GLOBAL_CDF_ATTRS_SCHEMA_FILE ) global_schema = HermesDataSchema._load_yaml_data( yaml_file_path=default_attributes_path ) return { attr_name: info["default"] for attr_name, info in global_schema.items() if info["default"] is not None } @staticmethod def _load_yaml_data(yaml_file_path: str) -> dict: """ Function to load data from a Yaml file. Parameters ---------- yaml_file_path: `str` Path to schem file to be used for CDF formatting. """ assert isinstance(yaml_file_path, str) assert Path(yaml_file_path).exists() # Load the Yaml file to Dict yaml_data = {} with open(yaml_file_path, "r") as f: try: yaml_data = yaml.safe_load(f) except yaml.YAMLError as exc: log.critical(exc) return yaml_data
[docs] @staticmethod def global_attribute_template() -> OrderedDict: """ Function to generate a template of required global attributes that must be set for a valid CDF. Returns ------- template : `OrderedDict` A template for required global attributes that must be provided. """ template = OrderedDict() global_attribute_schema = HermesDataSchema._load_default_global_attr_schema() default_global_attributes = HermesDataSchema._load_default_attributes() for attr_name, attr_schema in global_attribute_schema.items(): if ( attr_schema["required"] and not attr_schema["derived"] and attr_name not in default_global_attributes ): template[attr_name] = None return template
[docs] @staticmethod def measurement_attribute_template() -> OrderedDict: """ Function to generate a template of required measurement attributes that must be set for a valid CDF measurement variable. Returns ------- template: `OrderedDict` A template for required variable attributes that must be provided. """ template = OrderedDict() measurement_attribute_schema = ( HermesDataSchema._load_default_variable_attr_schema() ) for attr_name, attr_schema in measurement_attribute_schema[ "attribute_key" ].items(): if attr_schema["required"] and not attr_schema["derived"]: template[attr_name] = None return template
[docs] @staticmethod def global_attribute_info(attribute_name: Optional[str] = None) -> Table: """ Function to generate a `astropy.table.Table` of information about each global metadata attribute. The `astropy.table.Table` contains all information in the HERMES global attribute schema including: - description: (`str`) A brief description of the attribute - default: (`str`) The default value used if none is provided - derived: (`bool`) Whether the attibute can be derived by the HERMES :py:class:`~hermes_core.util.schema.HermesDataSchema` class - required: (`bool`) Whether the attribute is required by HERMES standards - validate: (`bool`) Whether the attribute is included in the :py:func:`~hermes_core.util.validation.validate` checks (Note, not all attributes that are required are validated) - overwrite: (`bool`) Whether the :py:class:`~hermes_core.util.schema.HermesDataSchema` attribute derivations will overwrite an existing attribute value with an updated attribute value from the derivation process. Parameters ---------- attribute_name : `str`, optional, default None The name of the attribute to get specific information for. Returns ------- info: `astropy.table.Table` A table of information about global metadata. Raises ------ KeyError: If attribute_name is not a recognized global attribute. """ global_attribute_schema = HermesDataSchema._load_default_global_attr_schema() # Strip the Description of New Lines for attr_name in global_attribute_schema.keys(): global_attribute_schema[attr_name]["description"] = global_attribute_schema[ attr_name ]["description"].strip() # Get all the Attributes from the Schema attribute_names = list(global_attribute_schema.keys()) table_rows = [info for _, info in global_attribute_schema.items()] # Create the Info Table info = Table(rows=table_rows) info.add_column(col=attribute_names, name="Attribute", index=0) # Limit the Info to the requested Attribute if attribute_name and attribute_name in info["Attribute"]: info = info[info["Attribute"] == attribute_name] elif attribute_name and attribute_name not in info["Attribute"]: raise KeyError( f"Cannot find Global Metadata for attribute name: {attribute_name}" ) return info
[docs] @staticmethod def measurement_attribute_info(attribute_name: Optional[str] = None) -> Table: """ Function to generate a `astropy.table.Table` of information about each variable metadata attribute. The `astropy.table.Table` contains all information in the HERMES variable attribute schema including: - description: (`str`) A brief description of the attribute - derived: (`bool`) Whether the attibute can be derived by the HERMES :py:class:`~hermes_core.util.schema.HermesDataSchema` class - required: (`bool`) Whether the attribute is required by HERMES standards - overwrite: (`bool`) Whether the :py:class:`~hermes_core.util.schema.HermesDataSchema` attribute derivations will overwrite an existing attribute value with an updated attribute value from the derivation process. - valid_values: (`str`) List of allowed values the attribute can take for HERMES products, if applicable - alternate: (`str`) An additional attribute name that can be treated as an alternative of the given attribute. Not all attributes have an alternative and only one of a given attribute or its alternate are required. - var_types: (`str`) A list of the variable types that require the given attribute to be present. Parameters ---------- attribute_name : `str`, optional, default None The name of the attribute to get specific information for. Returns ------- info: `astropy.table.Table` A table of information about variable metadata. Raises ------ KeyError: If attribute_name is not a recognized global attribute. """ measurement_attribute_schema = ( HermesDataSchema._load_default_variable_attr_schema() ) measurement_attribute_key = measurement_attribute_schema["attribute_key"] # Strip the Description of New Lines for attr_name in measurement_attribute_key.keys(): measurement_attribute_key[attr_name]["description"] = ( measurement_attribute_key[attr_name]["description"].strip() ) # Create New Column to describe which VAR_TYPE's require the given attribute for attr_name in measurement_attribute_key.keys(): # Create a new list to store the var types measurement_attribute_key[attr_name]["var_types"] = [] for var_type in ["data", "support_data", "metadata"]: # If the attribute is required for the given var type if attr_name in measurement_attribute_schema[var_type]: measurement_attribute_key[attr_name]["var_types"].append(var_type) # Convert the list to a string that can be written to a CSV from the table measurement_attribute_key[attr_name]["var_types"] = " ".join( measurement_attribute_key[attr_name]["var_types"] ) # Get all the Attributes from the Schema attribute_names = list(measurement_attribute_key.keys()) table_rows = [info for _, info in measurement_attribute_key.items()] # Create the Info Table info = Table(rows=table_rows) info.add_column(col=attribute_names, name="Attribute", index=0) # Limit the Info to the requested Attribute if attribute_name and attribute_name in info["Attribute"]: info = info[info["Attribute"] == attribute_name] elif attribute_name and attribute_name not in info["Attribute"]: raise KeyError( f"Cannot find Variable Metadata for attribute name: {attribute_name}" ) return info
@staticmethod def _check_well_formed(data): """Checks if input data is well-formed, regular array Returns ------- :class:`~numpy.ndarray`s The input data as a well-formed array; may be the input data exactly. """ msg = ( "Data must be well-formed, regular array of number, string, or astropy.time" ) try: d = np.asanyarray(data) except ValueError: raise ValueError(msg) # In a future numpy, the case tested below will raise ValueError, # so can remove entire if block. if d.dtype == object: # this is probably going to be bad if d.shape != () and not len(d): # Completely empty, so "well-formed" enough return d if np.array(d.flat[0]).shape != (): # Sequence-like, so we know it's ragged raise ValueError(msg) return d def _types(self, data, backward=False, encoding="utf-8"): """ Find dimensions and valid types of a nested list-of-lists Any given data may be representable by a range of CDF types; infer the CDF types which can represent this data. This breaks down to: 1. Proper kind (numerical, string, time) 2. Proper range (stores highest and lowest number) 3. Sufficient resolution (EPOCH16 or TT2000 required if astropy.time has microseconds or below.) If more than one value satisfies the requirements, types are returned in preferred order: 1. Type that matches precision of data first, then 2. integer type before float type, then 3. Smallest type first, then 4. signed type first, then 5. specifically-named (CDF_BYTE) vs. generically named (CDF_INT1) So for example, EPOCH_16 is preferred over EPOCH if L{data} specifies below the millisecond level (rule 1), but otherwise EPOCH is preferred (rule 2). TIME_TT2000 is always preferred as of 0.3.0. For floats, four-byte is preferred unless eight-byte is required: 1. absolute values between 0 and 3e-39 2. absolute values greater than 1.7e38 This will switch to an eight-byte double in some cases where four bytes would be sufficient for IEEE 754 encoding, but where DEC formats would require eight. @param data: data for which dimensions and CDF types are desired @type data: list (of lists) @param backward: limit to pre-CDF3 types @type backward: bool @param encoding: Encoding to use for Unicode input, default utf-8 @type backward: str @return: dimensions of L{data}, in order outside-in; CDF types which can represent this data; number of elements required (i.e. length of longest string) @rtype: 3-tuple of lists ([int], [ctypes.c_long], [int]) @raise ValueError: if L{data} has irregular dimensions """ d = HermesDataSchema._check_well_formed(data) dims = d.shape elements = 1 types = [] if d.dtype.kind in ("S", "U"): # it's a string types = [const.CDF_CHAR, const.CDF_UCHAR] # Length of string from type (may be longer than contents) elements = d.dtype.itemsize if d.dtype.kind == "U": # Big enough for contents (bytes/char are encoding-specific) elements = max( elements // 4, # numpy stores as 4-byte np.char.encode(d, encoding=encoding).dtype.itemsize, ) elif isinstance(data, Time): types = [const.CDF_TIME_TT2000, const.CDF_EPOCH16, const.CDF_EPOCH] elif d is data or isinstance(data, np.generic): # np array came in, use its type (or byte-swapped) types = [ k for k in self.numpytypedict if ( self.numpytypedict[k] == d.dtype or self.numpytypedict[k] == d.dtype.newbyteorder() ) and k not in self.timetypes ] # Maintain priority to match the ordered lists below: # float/double (44, 45) before real (21/22), and # byte (41) before int (1) before char (51). So hack. # Consider making typedict an ordered dict once 2.6 is dead. types.sort(key=lambda x: x % 50, reverse=True) if not types: # not a numpy array, or can't parse its type if d.dtype.kind == "O": # Object. Try to make it numeric if d.shape != () and not len(d): raise ValueError("Cannot determine CDF type of empty object array.") # Can't do safe casting from Object, so try and compare # Basically try most restrictive to least restrictive trytypes = (np.uint64, np.int64, np.float64) for t in trytypes: try: newd = d.astype(dtype=t) except TypeError: # Failure to cast, try next type continue if (newd == d).all(): # Values preserved, use this type d = newd # Continue with normal guessing, as if a list break else: # fell through without a match raise ValueError("Cannot convert generic objects to CDF type.") if d.dtype.kind in ("i", "u"): # integer minval = np.min(d) maxval = np.max(d) if minval < 0: types = [ const.CDF_BYTE, const.CDF_INT1, const.CDF_INT2, const.CDF_INT4, const.CDF_INT8, const.CDF_FLOAT, const.CDF_REAL4, const.CDF_DOUBLE, const.CDF_REAL8, ] cutoffs = [ 2**7, 2**7, 2**15, 2**31, 2**63, 1.7e38, 1.7e38, 8e307, 8e307, ] else: types = [ const.CDF_BYTE, const.CDF_INT1, const.CDF_UINT1, const.CDF_INT2, const.CDF_UINT2, const.CDF_INT4, const.CDF_UINT4, const.CDF_INT8, const.CDF_FLOAT, const.CDF_REAL4, const.CDF_DOUBLE, const.CDF_REAL8, ] cutoffs = [ 2**7, 2**7, 2**8, 2**15, 2**16, 2**31, 2**32, 2**63, 1.7e38, 1.7e38, 8e307, 8e307, ] types = [ t for (t, c) in zip(types, cutoffs) if c > maxval and (minval >= 0 or minval >= -c) ] else: # float if dims == (): if d != 0 and (abs(d) > 1.7e38 or abs(d) < 3e-39): types = [const.CDF_DOUBLE, const.CDF_REAL8] else: types = [ const.CDF_FLOAT, const.CDF_REAL4, const.CDF_DOUBLE, const.CDF_REAL8, ] else: absolutes = np.abs(d[d != 0]) if len(absolutes) > 0 and ( np.max(absolutes) > 1.7e38 or np.min(absolutes) < 3e-39 ): types = [const.CDF_DOUBLE, const.CDF_REAL8] else: types = [ const.CDF_FLOAT, const.CDF_REAL4, const.CDF_DOUBLE, const.CDF_REAL8, ] types = [t.value if hasattr(t, "value") else t for t in types] # If data has a type, might be a VarCopy, prefer that type if hasattr(data, "type"): try: t = data.type() except AttributeError: t = None pass if t in types: types = [t] # If passed array, types prefers its dtype, so try for compatible # and let type() override elif d is data: try: _ = data.astype(dtype=self.numpytypedict[t]) except ValueError: pass finally: types = [t] # And if the VarCopy specifies a number of elements, use that # if compatible if hasattr(data, "nelems"): ne = data.nelems() if ne > elements: elements = ne return (dims, types, elements) def _get_minmax(self, cdftype): """Find minimum, maximum possible value based on CDF type. This returns the processed value (e.g. astropy.times for Epoch types) because comparisons to EPOCH16s are otherwise difficult. Parameters ========== cdftype : int CDF type number from :mod:`~const` Raises ====== ValueError : if can't match the type Returns ======= out : tuple minimum, maximum value supported by type (of type matching the CDF type). """ if hasattr(cdftype, "value"): cdftype = cdftype.value if cdftype in [ const.CDF_EPOCH.value, const.CDF_EPOCH16.value, const.CDF_TIME_TT2000.value, ]: return ( Time("1900-1-1T00:00:00.000", format="isot"), Time("2250-1-1T00:00:00.000", format="isot"), ) dtype = self.numpytypedict.get(cdftype, None) if dtype is None: raise ValueError("Unknown data type: {}".format(cdftype)) if np.issubdtype(dtype, np.integer): inf = np.iinfo(dtype) elif np.issubdtype(dtype, np.floating): inf = np.finfo(dtype) else: raise ValueError("Unknown data type: {}".format(cdftype)) return (inf.min, inf.max)
[docs] def derive_measurement_attributes( self, data, var_name: str, guess_types: Optional[list[int]] = None ) -> OrderedDict: """ Function to derive metadata for the given measurement. Parameters ---------- data : `hermes_core.timedata.HermesData` An instance of `HermesData` to derive metadata from var_name : `str` The name of the measurement to derive metadata for guess_types : `list[int]`, optional Guessed CDF Type of the variable Returns ------- attributes: `OrderedDict` A dict containing `key: value` pairs of derived metadata attributes. """ measurement_attributes = OrderedDict() # Guess the const CDF Data Type var_data = data[var_name] if not guess_types: if var_name == "time": # Guess the const CDF Data Type (guess_dims, guess_types, guess_elements) = self._types(var_data) elif hasattr(var_data, "value"): # Support NDData use `.value` (guess_dims, guess_types, guess_elements) = self._types(var_data.value) else: # TimeSeries Quantity and Spectra NDCube use `.data` (guess_dims, guess_types, guess_elements) = self._types(var_data.data) # Check the Attributes that can be derived var_type = self._get_var_type(data, var_name) if var_type == "data": # Derive Attributes Specific to `data` VAR_TYPE if not var_name == "time": measurement_attributes["DEPEND_0"] = self._get_depend() measurement_attributes["DISPLAY_TYPE"] = self._get_display_type() measurement_attributes["FIELDNAM"] = self._get_fieldnam(var_name) measurement_attributes["FILLVAL"] = self._get_fillval(guess_types[0]) measurement_attributes["FORMAT"] = self._get_format( var_data, guess_types[0] ) measurement_attributes["LABLAXIS"] = self._get_lablaxis(data, var_name) measurement_attributes["SI_CONVERSION"] = self._get_si_conversion( data, var_name ) measurement_attributes["UNITS"] = self._get_units(data, var_name) measurement_attributes["VALIDMIN"] = self._get_validmin(guess_types[0]) measurement_attributes["VALIDMAX"] = self._get_validmax(guess_types[0]) measurement_attributes["VAR_TYPE"] = self._get_var_type(data, var_name) elif var_type == "support_data": # Derive Attributes Specific to `support_data` VAR_TYPE measurement_attributes["FIELDNAM"] = self._get_fieldnam(var_name) measurement_attributes["FILLVAL"] = self._get_fillval(guess_types[0]) measurement_attributes["FORMAT"] = self._get_format( var_data, guess_types[0] ) measurement_attributes["LABLAXIS"] = self._get_lablaxis(data, var_name) measurement_attributes["SI_CONVERSION"] = self._get_si_conversion( data, var_name ) measurement_attributes["UNITS"] = self._get_units(data, var_name) measurement_attributes["VALIDMIN"] = self._get_validmin(guess_types[0]) measurement_attributes["VALIDMAX"] = self._get_validmax(guess_types[0]) measurement_attributes["VAR_TYPE"] = self._get_var_type(data, var_name) elif var_type == "metadata": # Derive Attributes Specific to `metadata` VAR_TYPE measurement_attributes["FIELDNAM"] = self._get_fieldnam(var_name) measurement_attributes["FILLVAL"] = self._get_fillval(guess_types[0]) measurement_attributes["FORMAT"] = self._get_format( var_data, guess_types[0] ) measurement_attributes["VAR_TYPE"] = self._get_var_type(data, var_name) else: warn_user( f"Variable {var_name} has unrecognizable VAR_TYPE ({var_type}). Cannot Derive Metadata for Variable." ) # Derive Attributes Specific to `spectra` Data if hasattr(var_data, "wcs") and getattr(var_data, "wcs") is not None: spectra_attributes = self._derive_spectra_attributes(var_data) measurement_attributes.update(spectra_attributes) return measurement_attributes
[docs] def derive_time_attributes(self, data) -> OrderedDict: """ Function to derive metadata for the time measurement. Parameters ---------- data : `hermes_core.timedata.HermesData` An instance of `HermesData` to derive metadata from. Returns ------- attributes : `OrderedDict` A dict containing `key: value` pairs of time metadata attributes. """ # Get the Variable Data var_data = data["time"] (guess_dims, guess_types, guess_elements) = self._types(var_data) time_attributes = self.derive_measurement_attributes( data, "time", guess_types=guess_types ) # Check the Attributes that can be derived time_attributes["REFERENCE_POSITION"] = self._get_reference_position( guess_types[0] ) time_attributes["RESOLUTION"] = self._get_resolution(data) time_attributes["TIME_BASE"] = self._get_time_base(guess_types[0]) time_attributes["TIME_SCALE"] = self._get_time_scale(guess_types[0]) time_attributes["UNITS"] = self._get_time_units(guess_types[0]) return time_attributes
[docs] def derive_global_attributes(self, data) -> OrderedDict: """ Function to derive global attributes for the given measurement data. Parameters ---------- data : `hermes_core.timedata.HermesData` An instance of `HermesData` to derive metadata from. Returns ------- attributes : `OrderedDict` A dict containing `key: value` pairs of global metadata attributes. """ global_attributes = OrderedDict() # Loop through Global Attributes for attr_name, attr_schema in self.global_attribute_schema.items(): if attr_schema["derived"]: derived_value = self._derive_global_attribute(data, attr_name=attr_name) global_attributes[attr_name] = derived_value return global_attributes
def _derive_global_attribute(self, data, attr_name): """ Function to Derive Global Metadata Attributes """ # SWITCH on the Derivation attr_name if attr_name == "Generation_date": return self._get_generation_date(data) elif attr_name == "Start_time": return self._get_start_time(data) elif attr_name == "Data_type": return self._get_data_type(data) elif attr_name == "Logical_file_id": return self._get_logical_file_id(data) elif attr_name == "Logical_source": return self._get_logical_source(data) elif attr_name == "Logical_source_description": return self._get_logical_source_description(data) elif attr_name == "HERMES_version": return self._get_hermes_version(data) elif attr_name == "CDF_Lib_version": return self._get_cdf_lib_version(data) else: raise ValueError(f"Derivation for Attribute ({attr_name}) Not Recognized") def _derive_spectra_attributes(self, var_data): """ Function to Derive WCS-Keyword Metadata Attributes for a given spectra variable based on the variables `.wcs` member. """ spectra_attributes = OrderedDict() # WCSAXIS is a Single Attribute spectra_attributes["WCSAXES"] = self._get_wcs_naxis(var_data) # Get Sets/Collections of Attributes for keyword, prop, _ in self.wcs_keyword_to_astropy_property: for dimension_i in range(spectra_attributes["WCSAXES"]): dimension_attr_name = ( f"{keyword}{dimension_i+1}" # KeynameName Indexed 1-4 vs 0-3 ) # Add the Property Value for the given Axis as a Metadata Attribute spectra_attributes[dimension_attr_name] = self._get_wcs_dimension_attr( var_data=var_data, prop=prop, dimension=dimension_i ) # Derive WCS Time Attributes spectra_attributes["MJDREF"] = self._get_wcs_timeref(var_data) spectra_attributes["TIMEUNIT"] = self._get_wcs_timeunit(var_data) spectra_attributes["TIMEDEL"] = self._get_wcs_timedel(var_data) return spectra_attributes # ============================================================================================= # VARIABLE METADATA DERIVATIONS # ============================================================================================= def _get_depend(self): return "Epoch" def _get_display_type(self): return "time_series" def _get_fieldnam(self, var_name): if var_name != "time": return deepcopy(var_name) else: return "Epoch" def _get_fillval(self, guess_type): # Get the Variable Data if guess_type == const.CDF_TIME_TT2000.value: return Time("9999-12-31T23:59:59.999999", format="isot") else: # Get the FILLVAL for the gussed data type fillval = self._fillval_helper(cdf_type=guess_type) return fillval def _fillval_helper(self, cdf_type): # Fill value, indexed by the CDF type (numeric) fillvals = {} # Integers for i in (1, 2, 4, 8): fillvals[getattr(const, "CDF_INT{}".format(i)).value] = -(2 ** (8 * i - 1)) if i == 8: continue fillvals[getattr(const, "CDF_UINT{}".format(i)).value] = 2 ** (8 * i) - 1 fillvals[const.CDF_EPOCH16.value] = (-1e31, -1e31) fillvals[const.CDF_REAL8.value] = -1e31 fillvals[const.CDF_REAL4.value] = -1e31 fillvals[const.CDF_CHAR.value] = " " fillvals[const.CDF_UCHAR.value] = " " # Equivalent pairs for cdf_t, equiv in ( (const.CDF_TIME_TT2000, const.CDF_INT8), (const.CDF_EPOCH, const.CDF_REAL8), (const.CDF_BYTE, const.CDF_INT1), (const.CDF_FLOAT, const.CDF_REAL4), (const.CDF_DOUBLE, const.CDF_REAL8), ): fillvals[cdf_t.value] = fillvals[equiv.value] value = fillvals[cdf_type] return value def _get_format(self, var_data, cdftype): """ Format can be specified using either Fortran or C format codes. For instance, "F10.3" indicates that the data should be displayed across 10 characters where 3 of those characters are to the right of the decimal. For a description of FORTRAN formatting codes see the docs here: https://docs.oracle.com/cd/E19957-01/805-4939/z40007437a2e/index.html """ minn = "VALIDMIN" maxx = "VALIDMAX" if cdftype in ( const.CDF_INT1.value, const.CDF_INT2.value, const.CDF_INT4.value, const.CDF_INT8.value, const.CDF_UINT1.value, const.CDF_UINT2.value, const.CDF_UINT4.value, const.CDF_BYTE.value, ): if minn in var_data.meta: # Just use validmin or scalemin minval = var_data.meta[minn] elif cdftype in ( const.CDF_UINT1.value, const.CDF_UINT2.value, const.CDF_UINT4.value, ): # unsigned, easy minval = 0 elif cdftype == const.CDF_BYTE.value: minval = -(2**7) else: # Signed, harder size = next( ( i for i in (1, 2, 4, 8) if getattr(const, "CDF_INT{}".format(i)).value == cdftype ) ) minval = -(2 ** (8 * size - 1)) if maxx in var_data.meta: # Just use max maxval = var_data.meta[maxx] elif cdftype == const.CDF_BYTE.value: maxval = 2**7 - 1 else: size = next( ( 8 * i for i in (1, 2, 4) if getattr(const, "CDF_UINT{}".format(i)).value == cdftype ), None, ) if size is None: size = ( next( ( 8 * i for i in (1, 2, 4, 8) if getattr(const, "CDF_INT{}".format(i)).value == cdftype ) ) - 1 ) maxval = 2**size - 1 # Two tricks: # -Truncate and add 1 rather than ceil so get # powers of 10 (log10(10) = 1 but needs two digits) # -Make sure not taking log of zero if minval < 0: # Need an extra space for the negative sign fmt = "I{}".format( int(math.log10(max(abs(maxval), abs(minval), 1))) + 2 ) else: fmt = "I{}".format(int(math.log10(maxval) if maxval != 0 else 1) + 1) elif cdftype == const.CDF_TIME_TT2000.value: fmt = "A{}".format(len("9999-12-31T23:59:59.999999999")) elif cdftype == const.CDF_EPOCH16.value: fmt = "A{}".format(len("31-Dec-9999 23:59:59.999.999.000.000")) elif cdftype == const.CDF_EPOCH.value: fmt = "A{}".format(len("31-Dec-9999 23:59:59.999")) elif cdftype in ( const.CDF_REAL8.value, const.CDF_REAL4.value, const.CDF_FLOAT.value, const.CDF_DOUBLE.value, ): if "VALIDMIN" in var_data.meta and "VALIDMAX" in var_data.meta: range = var_data.meta["VALIDMAX"] - var_data.meta["VALIDMIN"] # If not, just use nothing. else: range = None # Find how many spaces we need for the 'integer' part of the number # (Use maxx-minn for this...effectively uses VALIDMIN/MAX for most # cases.) if range and (minn in var_data.meta and maxx in var_data.meta): if len(str(int(var_data.meta[maxx]))) >= len( str(int(var_data.meta[minn])) ): ln = str(int(var_data.meta[maxx])) else: ln = str(int(var_data.meta[minn])) if range and ln and range < 0: # Cover all our bases: range = None # Switch on Range if ( range and ln and range <= 11 ): # If range <= 11, we want 2 decimal places: # Need extra for '.', and 3 decimal places (4 extra) fmt = "F{}.3".format(len([i for i in ln]) + 4) elif range and ln and 11 < range <= 101: # Need extra for '.' (1 extra) fmt = "F{}.2".format(len([i for i in ln]) + 3) elif range and ln and 101 < range <= 1000: # Need extra for '.' (1 extra) fmt = "F{}.1".format(len([i for i in ln]) + 2) else: # No range, must not be populated, copied from REAL4/8(s) above # OR we don't care because it's a 'big' number: fmt = "G10.8E3" elif cdftype in ( const.CDF_CHAR.value, const.CDF_UCHAR.value, ): if hasattr(var_data, "data"): var_data = var_data.data fmt = "A{}".format(len(var_data)) else: raise ValueError( "Couldn't find FORMAT for type {}".format( self.cdftypenames.get(cdftype, "UNKNOWN") ) ) return fmt def _get_lablaxis(self, data, var_name): return f"{var_name} [{self._get_units(data, var_name)}]" def _get_reference_position(self, guess_type): if guess_type == const.CDF_TIME_TT2000.value: return "rotating Earth geoid" else: msg = f"Reference Position for Time type ({guess_type}) not found." raise TypeError(msg) def _get_resolution(self, data): # Get the Variable Data times = data.time if len(times) < 2: raise ValueError( f"Can not derive Time Resolution, need 2 samples, found {times}." ) # Calculate the Timedelta between two time samples delta = times[1] - times[0] # Get the number of second between samples. delta_seconds = delta.to_value("s") return f"{delta_seconds}s" def _get_si_conversion(self, data, var_name): # Get the Variable Data var_data = data[var_name] if var_name == "time": conversion_rate = u.ns.to(u.s) si_conversion = f"{conversion_rate:e}>{u.s}" else: # Get the Units as a String if isinstance(var_data, u.Quantity): try: conversion_rate = var_data.unit.to(var_data.si.unit) si_conversion = f"{conversion_rate:e}>{var_data.si.unit}" except u.UnitConversionError: si_conversion = f"1.0>{var_data.unit}" else: si_conversion = " > " return si_conversion def _get_time_base(self, guess_type): if guess_type == const.CDF_TIME_TT2000.value: return "J2000" else: raise TypeError(f"Time Base for Time type ({guess_type}) not found.") def _get_time_scale(self, guess_type): if guess_type == const.CDF_TIME_TT2000.value: return "Terrestrial Time (TT)" else: raise TypeError(f"Time Scale for Time type ({guess_type}) not found.") def _get_time_units(self, guess_type): if guess_type == const.CDF_TIME_TT2000.value: return "ns" else: raise TypeError(f"Time Units for Time type ({guess_type}) not found.") def _get_units(self, data, var_name): # Get the Variable Data var_data = data[var_name] unit = "" # Get the Unit from the TimeSeries Quantity if it exists if hasattr(var_data, "unit") and var_data.unit is not None: unit = var_data.unit.to_string() # Try to ge the UNITS from the metadata elif "UNITS" in var_data.meta and var_data.meta["UNITS"] is not None: unit = var_data.meta["UNITS"] return unit def _get_validmin(self, guess_type): # Get the Min Value minval, _ = self._get_minmax(guess_type) return minval def _get_validmax(self, guess_type): # Get the Max Value _, maxval = self._get_minmax(guess_type) return maxval def _get_var_type(self, data, var_name): # Get the Variable Data var_data = data[var_name] attr_name = "VAR_TYPE" if (attr_name not in var_data.meta) or (not var_data.meta[attr_name]): var_type = "data" else: var_type = var_data.meta[attr_name] return var_type # ============================================================================================= # SPECTRA METADATA DERIVATIONS # ============================================================================================= def _get_wcs_naxis(self, var_data): """ Function to get the number of axes within a spectra WCS member """ attr_name = "WCSAXES" if (attr_name not in var_data.meta) or (not var_data.meta[attr_name]): attr_value = var_data.wcs.wcs.naxis else: attr_value = var_data.meta[attr_name] return int(attr_value) def _get_wcs_timeref(self, var_data): """ Function to get the reference time within a spectra WCS member """ attr_name = "MJDREF" if (attr_name not in var_data.meta) or (not var_data.meta[attr_name]): attr_value = var_data.wcs.wcs.mjdref[0] else: attr_value = var_data.meta[attr_name] return attr_value def _get_wcs_timeunit(self, var_data): """ Function to get the time units within a spectra WCS member """ attr_name = "TIMEUNIT" if (attr_name not in var_data.meta) or (not var_data.meta[attr_name]): attr_value = var_data.wcs.wcs.timeunit else: attr_value = var_data.meta[attr_name] return attr_value def _get_wcs_timedel(self, var_data): """ Function to get the time delta (between points) within a spectra WCS member """ attr_name = "TIMEDEL" if (attr_name not in var_data.meta) or (not var_data.meta[attr_name]): attr_value = var_data.wcs.wcs.timedel else: attr_value = var_data.meta[attr_name] return attr_value def _get_wcs_dimension_attr(self, var_data, prop, dimension): """ Function to get the spectra's WCS keywork property along the given axis """ # Get the Property for the given WCS Keyword for the given Axis property_value = getattr(var_data.wcs.wcs, prop)[dimension] # Convert to a String as needed if isinstance(property_value, u.UnitBase): property_value = property_value.to_string() return property_value # ============================================================================================= # GLOBAL METADATA DERIVATIONS # ============================================================================================= def _get_logical_file_id(self, data): """ Function to get the `Logical_file_id` required global attribute. The attribute stores the name of the CDF File without the file extension (e.g. '.cdf'). This attribute is requires to avoid loss of the originial source in case of renaming. """ attr_name = "Logical_file_id" if (attr_name not in data.meta) or (not data.meta[attr_name]): # Get Parts instrument_id = self._get_instrument_id(data) start_time = self._get_start_time(data) data_level = self._get_data_level(data) version = self._get_version(data) mode = self._get_instrument_mode(data) # Build Derivation science_filename = util.create_science_filename( instrument=instrument_id, time=start_time, level=data_level, version=version, mode=mode, ) science_filename = science_filename.rstrip(util.FILENAME_EXTENSION) else: science_filename = data.meta[attr_name] return science_filename def _get_logical_source(self, data): """ Function to get the `Logical_source` required global attribute. This attribute determines the file naming convention in the SKT Editor and is used by CDA Web. """ attr_name = "Logical_source" if (attr_name not in data.meta) or (not data.meta[attr_name]): # Get Parts spacecraft_id = self._get_spacecraft_id(data) instrument_id = self._get_instrument_id(data) data_type = self._get_data_type(data) data_type_short_name, _ = data_type.split(">") # Build Derivation logical_source = f"{spacecraft_id}_{instrument_id}_{data_type_short_name}" else: logical_source = data.meta[attr_name] return logical_source def _get_logical_source_description(self, data): """ Function to get the `Logical_source_description` required global attribute. This attribute writes out the full words associated with the encryped `Logical_source` attribute. """ attr_name = "Logical_source_description" if (attr_name not in data.meta) or (not data.meta[attr_name]): # Get Parts spacecraft_long_name = self._get_spacecraft_long_name(data) instrument_long_name = self._get_instrument_long_name(data) data_type = self._get_data_type(data) _, data_type_long_name = data_type.split(">") logical_source_description = ( f"{spacecraft_long_name} {instrument_long_name} {data_type_long_name}" ) else: logical_source_description = data.meta[attr_name] return logical_source_description def _get_data_type(self, data): """ Function to get the `Data_type` required global attribute. This attribute is used by the CDF Writing software to create the filename. It is a combination of the following components: - mode - data_level - optional_data_product_descriptor """ attr_name = "Data_type" if (attr_name not in data.meta) or (not data.meta[attr_name]): short_parts = [] long_parts = [] # Get `mode` mode_short_name = self._get_instrument_mode(data) mode_long_name = self._get_instrument_mode(data) if bool(mode_short_name and mode_long_name): short_parts.append(mode_short_name) long_parts.append(mode_long_name) # Get `data level` data_level_short_name = self._get_data_level(data) data_level_long_name = self._get_data_level_long_name(data) if bool(data_level_short_name and data_level_long_name): short_parts.append(data_level_short_name) long_parts.append(data_level_long_name) # Get `data product descriptor` odpd_short_name = self._get_data_product_descriptor(data) odpd_long_name = self._get_data_product_descriptor(data) if bool(odpd_short_name and odpd_long_name): short_parts.append(odpd_short_name) long_parts.append(odpd_long_name) # Build Derivation data_type = "_".join(short_parts) + ">" + " ".join(long_parts) else: data_type = data.meta[attr_name] return data_type def _get_spacecraft_id(self, data): """Function to get Spacecraft ID from Source_name Global Attribute""" attr_name = "Source_name" if (attr_name not in data.meta) or (not data.meta[attr_name]): # Get Module Default sc_id = hermes_core.MISSION_NAME else: sc_id = data.meta["Source_name"] # Formatting if ">" in sc_id: short_name, _ = sc_id.split(">") sc_id = short_name.lower() # Makse sure its all lowercase return sc_id def _get_spacecraft_long_name(self, data): """Function to get Spacecraft ID from Source_name Global Attribute""" attr_name = "Source_name" if (attr_name not in data.meta) or (not data.meta[attr_name]): # Get Module Default sc_id = hermes_core.MISSION_NAME else: sc_id = data.meta["Source_name"] # Formatting if ">" in sc_id: _, long_name = sc_id.split(">") sc_id = long_name return sc_id def _get_instrument_id(self, data): """ Function to get Instrument ID from Descriptor Global Attribute Instrument of investigation identifier shortened to three letter acronym. """ attr_name = "Descriptor" if (attr_name not in data.meta) or (not data.meta[attr_name]): instr_id = None else: instr_id = data.meta["Descriptor"] # Formatting if ">" in instr_id: short_name, _ = instr_id.split(">") instr_id = short_name.lower() # Makse sure its all lowercase return instr_id def _get_instrument_long_name(self, data): """ Function to get Instrument ID from Descriptor Global Attribute Instrument of investigation identifier shortened to three letter acronym. """ attr_name = "Descriptor" if (attr_name not in data.meta) or (not data.meta[attr_name]): instr_id = None else: instr_id = data.meta["Descriptor"] # Formatting if ">" in instr_id: _, long_name = instr_id.split(">") instr_id = long_name return instr_id def _get_data_level(self, data): """ Function to get Data Level of CDF data The level to which the data product has been processed. """ attr_name = "Data_level" if (attr_name not in data.meta) or (not data.meta[attr_name]): data_level = None else: data_level = data.meta["Data_level"] # Formatting if ">" in data_level: short_name, _ = data_level.split(">") data_level = short_name.lower() # Makse sure its all lowercase return data_level def _get_data_level_long_name(self, data): """ Function to get Data Level of CDF data The level to which the data product has been processed. """ attr_name = "Data_level" if (attr_name not in data.meta) or (not data.meta[attr_name]): data_level = None else: data_level = data.meta["Data_level"] # Formatting if ">" in data_level: _, long_name = data_level.split(">") data_level = long_name return data_level def _get_data_product_descriptor(self, data): """ Function to get the (Optional) Data Product Descriptor. This is an optional field that may not be needed for all products. Where it is used, identifier shouls be short (3-8 charachters) descriptors that are helpful to end users. If a descriptor contains multiple components, underscores are used top separate hose components. """ attr_name = "Data_product_descriptor" if (attr_name not in data.meta) or (not data.meta[attr_name]): odpd = "" else: odpd = data.meta["Data_product_descriptor"] return odpd def _get_generation_date(self, data): """ Function to get the date that the CDF was generated. """ return Time.now().strftime("%Y-%m-%d") def _get_start_time(self, data): """ Function to get the start time of the data contained in the CDF given in format `YYYYMMDDThhmmss` """ # Get the Start Time from the TimeSeries return data["time"][0].isot def _get_version(self, data): """ Function to get the 3-part version number of the data product. """ attr_name = "Data_version" if (attr_name not in data.meta) or (not data.meta[attr_name]): version = None else: version_str = data.meta["Data_version"].lower() if "v" in version_str: _, version = version_str.split("v") else: version = version_str return version def _get_instrument_mode(self, data): """Function to get the mode attribute (TBS)""" attr_name = "Instrument_mode" if (attr_name not in data.meta) or (not data.meta[attr_name]): instr_mode = "" else: instr_mode = data.meta["Instrument_mode"] return instr_mode.lower() # Makse sure its all lowercase def _get_hermes_version(self, data): """Function to get the version of HERMES used to generate the data""" attr_name = "HERMES_version" if (attr_name not in data.meta) or (not data.meta[attr_name]): hermes_version = hermes_core.__version__ else: hermes_version = data.meta[attr_name] return hermes_version def _get_cdf_lib_version(self, data): """Function to get the version of CDF library used to generate the data""" attr_name = "CDF_Lib_version" if (attr_name not in data.meta) or (not data.meta[attr_name]): try: import spacepy.pycdf as pycdf cdf_lib_version = pycdf.lib.version except (ImportError, AttributeError) as e: cdf_lib_version = "unknown version" else: cdf_lib_version = data.meta[attr_name] return cdf_lib_version