Source code for hermes_core.util.validation

from pathlib import Path
from abc import ABC, abstractmethod
import numpy as np
from typing import Union
from spacepy.pycdf import CDF, CDFError
from spacepy.pycdf.istp import FileChecks, VariableChecks
from hermes_core.util.schema import HermesDataSchema

__all__ = ["validate", "CDFValidator"]


[docs] def validate(filepath: str) -> list[str]: """ Validate a data file such as a CDF. Parameters ---------- filepath : `str` A fully specificed file path. Returns ------- errors : `list[str]` A list of validation errors returned. A valid file will result in an emppty list being returned. """ # Determine the file type file_extension = Path(filepath).suffix # Create the appropriate validator object based on file type if file_extension == ".cdf": validator = CDFValidator() else: raise ValueError(f"Unsupported file type: {file_extension}") # Call the validate method of the validator object return validator.validate(filepath)
class HermesDataValidator(ABC): """ Abstract base class for heliophysics data validators. """ @abstractmethod def validate(self, file_path: str) -> list[str]: """ Validate the heliophysics data file. Parameters ---------- file_path : `str` The path to the data file. Returns ------- errors : `list[str]` A list of validation errors returned. A valid file will result in an emppty list being returned. """ pass
[docs] class CDFValidator(HermesDataValidator): """ Validator for CDF files. """ def __init__(self): super().__init__() # CDF Schema self.schema = HermesDataSchema()
[docs] def validate(self, file_path: str) -> list[str]: """ Validate the CDF file. Parameters ---------- file_path : `str` The path to the CDF file. Returns ------- errors : `list[str]` A list of validation errors returned. A valid file will result in an emppty list being returned. """ # Initialize Validation Errrors validation_errors = [] try: # Open CDF file with context manager with CDF(file_path, readonly=True) as cdf_file: # Verify that all `required` global attributes in the schema are present global_attr_validation_errors = self._validate_global_attr_schema( cdf_file=cdf_file ) validation_errors.extend(global_attr_validation_errors) # Verify that all `required` variable attributes in the schema are present variable_attr_validation_errors = self._validate_variable_attr_schema( cdf_file=cdf_file ) validation_errors.extend(variable_attr_validation_errors) # Validate the CDF Using ISTP Module `FileChecks` Class file_checks_errors = self._file_checks(cdf_file=cdf_file) validation_errors.extend(file_checks_errors) except CDFError: validation_errors.append(f"Could not open CDF File at path: {file_path}") return validation_errors
def _validate_global_attr_schema(self, cdf_file: CDF) -> list[str]: """ Function to ensure all required global attributes in the schema are present in the generated CDF File. """ global_attr_validation_errors = [] # Loop for each attribute in the schema for attr_name, attr_schema in self.schema.global_attribute_schema.items(): # If it is a required attribute and not present if attr_schema["validate"] and (attr_name not in cdf_file.attrs): global_attr_validation_errors.append( f"Required attribute ({attr_name}) not present in global attributes.", ) # If it is a required attribute but null if ( attr_schema["validate"] and (attr_name in cdf_file.attrs) and ( (cdf_file.attrs[attr_name][0] == "") or (cdf_file.attrs[attr_name][0] is None) ) ): global_attr_validation_errors.append( f"Required attribute ({attr_name}) not present in global attributes.", ) return global_attr_validation_errors def _validate_variable_attr_schema(self, cdf_file: CDF) -> list[str]: """ Function to ensure all required variable attributes in the schema are present in the generated CDF file. """ variable_attr_validation_errors = [] # Loop for each Variable in the CDF File for var_name in cdf_file: # Get the `Var()` Class for the Variable var_data = cdf_file[var_name] # Get the Variable Type to compare the required attributes var_type = "" if "VAR_TYPE" in var_data.attrs: var_type = var_data.attrs["VAR_TYPE"] variable_errors = self._validate_variable(cdf_file, var_name, var_type) variable_attr_validation_errors.extend(variable_errors) else: variable_attr_validation_errors.append( f"Variable: {var_name} missing 'VAR_TYPE' attribute. Cannot Validate Variable." ) return variable_attr_validation_errors def _validate_variable( self, cdf_file: CDF, var_name: str, var_type: str ) -> list[str]: """ Function to Validate an individual Variable. """ variable_errors = [] # Get the Expected Attributes for the Variable Type var_type_attrs = self.schema.variable_attribute_schema[var_type] # Get the `Var()` Class for the Variable var_data = cdf_file[var_name] # Loop for each Variable Attribute in the schema for attr_name in var_type_attrs: attr_schema = self.schema.variable_attribute_schema["attribute_key"][ attr_name ] # If it is a required attribute and not present if attr_schema["required"] and attr_name not in var_data.attrs: # Check to see if there is an "alternate" attribute if attr_schema["alternate"] is None: variable_errors.append( f"Variable: {var_name} missing '{attr_name}' attribute." ) # If there is an alternate, and the alternate is not in the metadata if ( "alternate" in attr_schema and attr_schema["alternate"] is not None and attr_schema["alternate"] not in var_data.attrs ): variable_errors.append( f"Variable: {var_name} missing '{attr_name}' attribute. Alternative: {attr_schema['alternate']} not found." ) # Assume that the Attribue is Present in the metadata for the Variable else: # If the Var Data can be Validated if ( "valid_values" in attr_schema and attr_schema["valid_values"] is not None ): attr_valid_values = attr_schema["valid_values"] attr_value = var_data.attrs[attr_name] if attr_value not in attr_valid_values: variable_errors.append( ( f"Variable: {var_name} Attribute '{attr_name}' not one of valid options.", f"Was {attr_value}, expected one of {attr_valid_values}", ) ) # Validate Variable using ISTP Module `VariableChecks` class variable_checks_errors = self._variable_checks( cdf_file=cdf_file, var_name=var_name ) variable_errors.extend(variable_checks_errors) return variable_errors def _file_checks(self, cdf_file: CDF): """ Function to call individual pieces of the `spacepy.pycdf.istp.FileChecks` Class. We do not want to run all validation checks from this class using the `all()` function so we break up the individual function calls here. """ file_checks_errors = [] check_fns = [ FileChecks.empty_entry, FileChecks.filename, FileChecks.time_monoton, FileChecks.times, ] # Loop through the Functions we want to check for func in check_fns: # Try to call the given function and report errors try: file_checks_errors.extend(func(cdf_file)) # If the function errors out or does not complete, report this an an error itself. except: # noqa: E722 file_checks_errors.append( "Test {} did not complete.".format(func.__name__) ) return file_checks_errors def _variable_checks(self, cdf_file: CDF, var_name: str) -> list[str]: """ Function to call individual pieces of the `spacepy.pycdf.istp.VariableChecks` Class. We do not want to run all validation checks from this class using the `all()` function so we break up the individual function calls here. """ variable_checks_errors = [] check_fns = [ # This function makes incorrect asumptions about the UNITS that must be placed on # DELTA_PLUS_VAR and DELTA_MINUS var metadata attributes. # VariableChecks.deltas, VariableChecks.depends, VariableChecks.depsize, VariableChecks.empty_entry, # This function makes incorrect assumptions that the variable name must exactly # match the FILEDNAM metadata attribute. # VariableChecks.fieldnam, # This function makes incorrect assumtions that the FILLVAL must be derived from # the CDF data type of the variable. A FILLVAL should be allowed to be set as needed by # instrument team developers. # VariableChecks.fillval, VariableChecks.recordcount, # This function makes incorrect assumptions about the valid DISPLAY_TYPE options # based on the shape of the variable data. # VariableChecks.validdisplaytype, # This Function makes inforrect assumptions that the VLIDMIN and VLIDIMAX must be # derived from the CDF data type of the variable. A VALIDMIN and VALIDMAX should be # allowed to be set as needed by instrument team developers. self._validrange, self._validscale, ] # Loop through the Functions we want to check for func in check_fns: # Try to call the given function and report errors try: variable_checks_errors.extend( ("{}: {}".format(var_name, e) for e in func(cdf_file[var_name])) ) # If the function errors out or does not complete, report this an an error itself. except: # noqa: E722 variable_checks_errors.append( "{}: Test {} did not complete.".format(var_name, func.__name__) ) return variable_checks_errors def _validrange(self, v): """Check that all values are within VALIDMIN/VALIDMAX, or FILLVAL Compare all values of this variable to `VALIDMIN <https://spdf.gsfc.nasa.gov/istp_guide/vattributes.html#VALIDMIN>`_ and ``VALIDMAX``; fails validation if any values are below VALIDMIN or above ``VALIDMAX`` unless equal to `FILLVAL <https://spdf.gsfc.nasa.gov/istp_guide/vattributes.html#FILLVAL>`_. Parameters ---------- v : :class:`~spacepy.pycdf.Var` Variable to check Returns ------- list of str Description of each validation failure. """ return self._validhelper(v) def _validscale(self, v): """Check SCALEMIN<=SCALEMAX, and both in range for CDF datatype. Compares `SCALEMIN <https://spdf.gsfc.nasa.gov/istp_guide/vattributes.html#SCALEMIN>`_ to ``SCALEMAX`` to make sure it isn't larger and both are within range of the variable CDF datatype. Parameters ---------- v : :class:`~spacepy.pycdf.Var` Variable to check Returns ------- list of str Description of each validation failure. """ return self._validhelper(v, False) def _validhelper(self, v, rng=True): """Helper function for checking SCALEMIN/MAX, VALIDMIN/MAX Parameters ---------- v : :class:`~spacepy.pycdf.Var` Variable to check rng : bool Do range check (True, default) or scale check (False) Returns ------- list of str Description of each validation failure. """ validscale = "VALID" if rng else "SCALE" whichmin, whichmax = ( ("VALIDMIN", "VALIDMAX") if rng else ("SCALEMIN", "SCALEMAX") ) errs = [] vshape = v.shape minval, maxval = self.schema._get_minmax(v.type()) if rng: data = v[...] is_fill = False if "FILLVAL" in v.attrs: filldtype = self.schema.numpytypedict.get( v.attrs.type("FILLVAL"), object ) if np.issubdtype(v.dtype, np.floating) and np.issubdtype( filldtype, np.floating ): is_fill = np.isclose(data, v.attrs["FILLVAL"]) elif np.can_cast(np.asanyarray(v.attrs["FILLVAL"]), v.dtype): is_fill = data == v.attrs["FILLVAL"] for which in (whichmin, whichmax): if which not in v.attrs: continue attrval = v.attrs[which] multidim = bool(np.shape(attrval)) # multi-dimensional if multidim: # Compare shapes, require only 1D var # Match attribute dim to first non-record var dim firstdim = int(v.rv()) if vshape[firstdim] != np.shape(attrval)[0]: errs.append( ( "{} element count {} does not match first data" " dimension size {}." ).format(which, np.shape(attrval)[0], v.shape[firstdim]) ) continue if len(vshape) != firstdim + 1: # only one non-record dim errs.append( "Multi-element {} only valid with 1D variable.".format(which) ) continue if firstdim: # Add pseudo-record dim attrval = np.reshape(attrval, (1, -1)) # min, max, variable data all same dtype if not np.can_cast(np.asanyarray(attrval), np.asanyarray(minval).dtype): errs.append( "{} type {} not comparable to variable type {}.".format( which, self.schema.cdftypenames[v.attrs.type(which)], self.schema.cdftypenames[v.type()], ) ) continue # Cannot do comparisons if np.any((minval > attrval)) or np.any((maxval < attrval)): errs.append( "{} ({}) outside valid data range ({},{}).".format( which, attrval[0, :] if multidim else attrval, minval, maxval ) ) if not rng or not len(v): # nothing to compare continue # Always put numpy array on the left so knows to do element compare idx = (data < attrval) if which == whichmin else (data > attrval) idx = np.logical_and(idx, np.logical_not(is_fill)) if idx.any(): direction = "under" if which == whichmin else "over" if len(vshape) == 0: # Scalar errs.append( "Value {} {} {} {}.".format( data, direction, which, attrval[0, :] if multidim else attrval, ) ) continue badidx = np.nonzero(idx) badvals = data[badidx] if len(badidx) > 1: # Multi-dimensional data badidx = np.transpose(badidx) # Group by value not axis else: badidx = badidx[0] # Just recover the index value if len(badvals) < 10: badvalstr = ", ".join(str(d) for d in badvals) badidxstr = ", ".join(str(d) for d in badidx) errs.append( "Value {} at index {} {} {} {}.".format( badvalstr, badidxstr, direction, which, attrval[0, :] if multidim else attrval, ) ) else: errs.append( "{} values {} {} {}".format( len(badvals), direction, which, attrval[0, :] if multidim else attrval, ) ) if (whichmin in v.attrs) and (whichmax in v.attrs): if np.any(v.attrs[whichmin] > v.attrs[whichmax]): errs.append("{} > {}.".format(whichmin, whichmax)) return errs