Source code for swot_simulator.product_specification

# Copyright (c) 2020 CNES/JPL
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Parse/Load the product specification
====================================
"""
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
import datetime
import copy
import collections
import logging
import os
import pathlib
import xml.etree.ElementTree as xt
#
import netCDF4
import numpy as np
import xarray as xr
from . import orbit_propagator
from . import math
from . import PRODUCT_TYPE

LOGGER = logging.getLogger(__name__)

REFERENCE = "Gaultier, L., C. Ubelmann, and L.-L. Fu, 2016: The " \
    "Challenge of Using Future SWOT Data for Oceanic Field Reconstruction." \
    " J. Atmos. Oceanic Technol., 33, 119-126, doi:10.1175/jtech-d-15-0160" \
    ".1. http://dx.doi.org/10.1175/JTECH-D-15-0160.1."


def _find(element: xt.Element, tag: str) -> xt.Element:
    """Find a tag in the xml format specifcation file"""
    result = element.find(tag)
    if result is None:
        raise RuntimeError("The XML tag '" + tag + "' doesn't exist")
    return result


def _parse_type(dtype, width, signed):
    """Parse type from xml format specification file. """
    if dtype == "real":
        return getattr(np, "float" + width)
    elif dtype == "integer":
        return getattr(np, ("u" if not signed else "") + "int" + width)
    elif dtype == "string":
        return np.str
    elif dtype == "char":
        return np.dtype(f"S{width}")
    raise ValueError("Data type '" + dtype + "' is not recognized.")


def _cast_to_dtype(attr_value: Union[int, float], properties: Dict[str, str]):
    """Cast the attribute value to the numpy data type required"""
    return getattr(np, properties["dtype"])(attr_value)


[docs]def global_attributes(attributes: Dict[str, Dict[str, Any]], cycle_number: int, pass_number: int, date: np.ndarray, lng: np.ndarray, lat: np.ndarray) -> Dict[str, Any]: """Calculates the global attributes of the pass""" def _iso_date(date: np.datetime64) -> str: """Return the time formatted according to ISO.""" return datetime.datetime.utcfromtimestamp( date.astype("datetime64[us]").astype("int64") * 1e-6).isoformat() + "Z" now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S : Creation") ellipsoid_semi_major_axis = _cast_to_dtype( 6378137, attributes["ellipsoid_semi_major_axis"]) ellipsoid_flattening = _cast_to_dtype(1 / 298.25722356, attributes["ellipsoid_flattening"]) result = collections.OrderedDict({ "Conventions": "CF-1.7", "title": attributes["title"]["attrs"]["description"], "institution": "CNES/JPL", "source": "Simulate product", "history": now, "platform": "SWOT", "references": REFERENCE, "reference_document": "D-56407_SWOT_Product_Description_L2_LR_SSH", "contact": "CNES aviso@altimetry.fr, JPL podaac@podaac.jpl.nasa.gov", "cycle_number": _cast_to_dtype(cycle_number, attributes["cycle_number"]), "pass_number": _cast_to_dtype(pass_number, attributes["pass_number"]), "time_coverage_start": _iso_date(date[0]), "time_coverage_end": _iso_date(date[-1]), "geospatial_lon_min": lng.min(), "geospatial_lon_max": lng.max(), "geospatial_lat_min": lat.min(), "geospatial_lat_max": lat.max() }) if len(lng.shape) == 2: result.update({ "left_first_longitude": lng[0, 0], "left_first_latitude": lat[0, 0], "left_last_longitude": lng[-1, 0], "left_last_latitude": lat[-1, 0], "right_first_longitude": lng[0, -1], "right_first_latitude": lat[0, -1], "right_last_longitude": lng[-1, -1], "right_last_latitude": lat[-1, -1], }) result.update({ "wavelength": _cast_to_dtype(0.008385803020979, attributes["wavelength"]), "orbit_solution": "POE", }) for item in attributes: if item.startswith("xref_input"): result[item] = "N/A" result.update({ "ellipsoid_semi_major_axis": ellipsoid_semi_major_axis, "ellipsoid_flattening": ellipsoid_flattening }) return result
def _strtobool(value: str) -> bool: """Return the boolean value encoded in the string""" value = value.lower() if value == "true": return True elif value == "false": return False raise ValueError(f"invalid truth value {value!r}") def _parser(tree: xt.ElementTree): """Parse variables, attributes and shapes from xml format specification file""" variables = dict() attributes = dict() shapes = dict() for item in tree.getroot().findall("shape"): dims = [dim.attrib["name"] for dim in item.findall("dimension")] if dims: shapes[item.attrib["name"]] = tuple(dims) for item in _find(_find(tree.getroot(), 'science'), 'nodes'): dtype = _parse_type( item.tag, item.attrib["width"], _strtobool(item.attrib["signed"]) if "signed" in item.attrib else None) if not isinstance(dtype, np.dtype): dtype = dtype.__name__ annotation = item.find("annotation") if annotation is None: continue varname = item.attrib["name"] if varname.startswith("/@"): attributes[varname[2:]] = dict(attrs=annotation.attrib, dtype=dtype) else: del annotation.attrib["app"] variables[varname[1:]] = dict(attrs=annotation.attrib, dtype=dtype, shape=shapes[item.attrib["shape"]]) return variables, attributes def _create_variable_args(encoding: Dict[str, Dict], name: str, variable: xr.Variable) -> Tuple[str, Dict[str, Any]]: """Initiation of netCDF4.Dataset.createVariable method parameters from user-defined encoding information. """ kwargs = dict() keywords = encoding[name] if name in encoding else dict() if "_FillValue" in keywords: keywords["fill_value"] = keywords.pop("_FillValue") dtype = keywords.pop("dtype", variable.dtype) for key, value in dict(zlib=True, complevel=4, shuffle=True, fletcher32=False, contiguous=False, chunksizes=None, endian='native', least_significant_digit=None, fill_value=None).items(): kwargs[key] = keywords.pop(key, value) return dtype, kwargs def _create_variable(xr_dataset: xr.Dataset, nc_dataset: netCDF4.Dataset, encoding: Dict[str, Dict[str, Dict[str, Any]]], name: str, unlimited_dims: Optional[List[str]], variable: xr.Variable) -> None: """Creation and writing of the NetCDF variable""" unlimited_dims = unlimited_dims or list() variable.attrs.pop("_FillValue", None) # Encode datetime64 to float64 if np.issubdtype(variable.dtype, np.datetime64): # 946684800000000 number of microseconds between 2000-01-01 and # 1970-01-01 variable.values = ( variable.values.astype("datetime64[us]").astype("int64") - 946684800000000) * 1e-6 assert ( variable.attrs["units"] == "seconds since 2000-01-01 00:00:00.0") dtype, kwargs = _create_variable_args(encoding, name, variable) # If the dimensions doesn't exist then we have to create them. if not nc_dataset.dimensions: for dim_name, size in xr_dataset.dims.items(): nc_dataset.createDimension( dim_name, None if dim_name in unlimited_dims else size) ncvar = nc_dataset.createVariable(name, dtype, variable.dims, **kwargs) ncvar.setncatts(variable.attrs) values = variable.values if kwargs['fill_value'] is not None: if values.dtype.kind == "f" and np.any(np.isnan(values)): values[np.isnan(values)] = kwargs['fill_value'] values = np.ma.array(values, mask=values == kwargs['fill_value']) nc_dataset[name][:] = values
[docs]def to_netcdf(dataset: xr.Dataset, path: Union[str, pathlib.Path], encoding: Optional[Dict[str, Dict]] = None, unlimited_dims: Optional[List[str]] = None, **kwargs): """Write dataset contents to a netCDF file""" encoding = encoding or dict() if isinstance(path, str): path = pathlib.Path(path) path.parent.mkdir(parents=True, exist_ok=True) with netCDF4.Dataset(path, **kwargs) as stream: stream.setncatts(dataset.attrs) for name, variable in dataset.coords.items(): _create_variable(dataset, stream, encoding, name, unlimited_dims, variable) for name, variable in dataset.data_vars.items(): _create_variable(dataset, stream, encoding, name, unlimited_dims, variable)
[docs]class ProductSpecification: """Parse and load into memory the product specification@@""" SPECIFICATION = os.path.join(os.path.dirname(os.path.abspath(__file__)), "l2b-ssh.xml")
[docs] def __init__(self, product_type: Optional[str]): product_type = product_type or "expert" self.specification = os.path.join( os.path.dirname(os.path.abspath(__file__)), PRODUCT_TYPE[product_type]) self.variables, self.attributes = _parser(xt.parse(self.specification))
[docs] def __contains__(self, item): return item in self.variables
[docs] @staticmethod def fill_value(properties: Dict[str, Any]) -> np.ndarray: """Returns the fill value encoded with the data type specified""" dtype = properties["dtype"] if isinstance(dtype, str): return getattr(np, dtype)(properties["attrs"]["_FillValue"]) return np.array(properties["attrs"]["_FillValue"], dtype)
[docs] def time(self, time: np.ndarray) -> Tuple[Dict, List[xr.DataArray]]: """Gets the time axis""" encoding, data_array = self._data_array("time", time) # type: ignore return {"time": encoding}, [data_array]
[docs] def _data_array(self, name: str, data: np.ndarray) -> Optional[Tuple[Dict, xr.DataArray]]: """Returns a tuple containing the netCDF encoding of the variable and the data array.""" if name not in self.variables: return None properties = self.variables[name] attrs = copy.deepcopy(properties["attrs"]) # The fill value is casted to the target value of the variable fill_value = self.fill_value(properties) del attrs["_FillValue"] # Reading the storage properties of the variable () encoding: Dict[str, Any] = dict(_FillValue=fill_value, dtype=properties["dtype"]) # Some values read from the XML files must be decoded # TODO(fbriol): The type of these attributes should be determined from # their type, but at the moment this is not possible. for item in ["add_offset", "scale_factor"]: if item in attrs: attrs[item] = float(attrs[item]) for item in ["valid_range", "valid_min", "valid_max"]: if item in attrs: attrs[item] = _cast_to_dtype(attrs[item], properties) if "flag_values" in attrs: items = attrs["flag_values"].split() attrs["flag_values"] = np.array( [_cast_to_dtype(item, properties) for item in items], properties["dtype"]) if len(items) != 1 else _cast_to_dtype( float(attrs["flag_values"]), properties) # if "scale_factor" in attrs and "add_offset" not in attrs: # attrs["add_offset"] = 0.0 # if "add_offset" in attrs and "scale_factor" not in attrs: # attrs["scale_factor"] = 1.0 return encoding, xr.DataArray(data=data, dims=properties["shape"], name=name, attrs=attrs)
[docs] def x_ac(self, x_ac: np.ndarray) -> Optional[Tuple[Dict, xr.DataArray]]: """Returns the properties of the variable describing the cross track distance""" return self._data_array("cross_track_distance", x_ac)
[docs] def lon(self, lon: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the longitudes of the swath""" # Longitude must be in [0, 360.0[ result = self._data_array("longitude", math.normalize_longitude(lon, 0)) assert result is not None return result
[docs] def lon_nadir( self, lon_nadir: np.ndarray) -> Optional[Tuple[Dict, xr.DataArray]]: """Returns the properties of the variable describing the longitudes of the reference ground track""" # Longitude must be in [0, 360.0[ return self._data_array("longitude_nadir", math.normalize_longitude(lon_nadir, 0))
[docs] def lat(self, lat: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the latitudes of the swath""" result = self._data_array("latitude", lat) assert result is not None return result
[docs] def lat_nadir( self, lat_nadir: np.ndarray) -> Optional[Tuple[Dict, xr.DataArray]]: """Returns the properties of the variable describing the latitudes of the reference ground track""" return self._data_array("latitude_nadir", lat_nadir)
[docs] def ssh_karin(self, ssh: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the SSH measured by KaRIn""" result = self._data_array("ssh_karin", ssh) assert result is not None return result
[docs] def ssh_nadir(self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the SSH to nadir.""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["time"]["shape"], name="ssh_nadir", attrs={ 'coordinates': 'longitude latitude', 'long_name': 'sea surface height', 'scale_factor': 0.0001, 'standard_name': 'sea surface height above reference ellipsoid', 'units': 'm', 'valid_min': np.int32(-15000000), 'valid_max': np.int32(150000000) })
[docs] def simulated_true_ssh_nadir( self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the SSH to nadir free of measurement errors.""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray( data=array, dims=self.variables["time"]["shape"], name="simulated_true_ssh_nadir", attrs={ 'coordinates': 'time', 'long_name': 'sea surface height', 'scale_factor': 0.0001, 'standard_name': 'sea surface height above reference ellipsoid', 'units': 'm', 'valid_min': np.int32(-15000000), 'valid_max': np.int32(150000000), 'comment': 'Height of the sea surface free of measurement errors.' })
[docs] def swh_karin(self, swh: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the SSH measured by KaRIn""" result = self._data_array("swh_karin", swh) assert result is not None return result
[docs] def swh_nadir(self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the SSH to nadir free of measurement errors.""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["time"]["shape"], name="swh_nadir", attrs={ 'coordinates': 'time', 'long_name': 'Significant Wave Height', 'scale_factor': 0.0001, 'standard_name': 'Sigificant Wave Height', 'units': 'm', 'valid_min': np.int32(-15000000), 'valid_max': np.int32(150000000), })
[docs] def simulated_true_ssh_karin( self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the SSH KaRIn free of measurement errors.""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray( data=array, dims=self.variables["ssh_karin"]["shape"], name="simulated_true_ssh_karin", attrs={ 'coordinates': 'longitude latitude', 'long_name': 'sea surface height', 'scale_factor': 0.0001, 'standard_name': 'sea surface height above reference ellipsoid', 'units': 'm', 'valid_min': np.int32(-15000000), 'valid_max': np.int32(150000000), 'comment': 'Height of the sea surface free of measurement errors.' })
[docs] def simulated_error_baseline_dilation( self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the error due to baseline mast dilation""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["ssh_karin"]["shape"], name="simulated_error_baseline_dilation", attrs={ 'long_name': 'Error due to baseline mast dilation', 'scale_factor': 0.0001, '_FillValue': 2147483647, 'units': 'm', 'coordinates': 'longitude latitude' })
[docs] def simulated_error_roll(self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the error due to roll""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["ssh_karin"]["shape"], name="simulated_error_roll", attrs={ 'long_name': 'Error due to roll', 'units': 'm', 'scale_factor': 0.0001, 'coordinates': 'longitude latitude' })
[docs] def simulated_error_phase(self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the error due to phase""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["ssh_karin"]["shape"], name="simulated_error_phase", attrs={ 'long_name': 'Error due to phase', 'units': 'm', 'scale_factor': 0.0001, 'coordinates': 'longitude latitude' })
[docs] def simulated_roll_phase_estimate( self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the roll phase correction estimated""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["ssh_karin"]["shape"], name="simulated_roll_phase_estimate", attrs={ 'long_name': 'Error after estimation of roll phase', 'units': 'm', 'scale_factor': 0.0001, 'coordinates': 'longitude latitude' })
[docs] def simulated_error_karin(self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the KaRIn error""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["ssh_karin"]["shape"], name="simulated_error_karin", attrs={ 'long_name': 'KaRIn error', 'units': 'm', 'scale_factor': 0.0001, 'coordinates': 'longitude latitude' })
[docs] def simulated_error_timing(self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the timing error""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["ssh_karin"]["shape"], name="simulated_error_timing", attrs={ 'long_name': 'Timing error', 'units': 'm', 'scale_factor': 0.0001, 'coordinates': 'longitude latitude' })
[docs] def simulated_error_troposphere( self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the error due to wet troposphere path delay""" return { '_FillValue': 2147483647, 'dtype': 'int32', 'scale_factor': 0.0001 }, xr.DataArray(data=array, dims=self.variables["ssh_karin"]["shape"], name="simulated_error_troposphere", attrs={ 'long_name': 'Error due to wet troposphere path delay', 'units': 'm', 'scale_factor': 0.0001, 'coordinates': 'longitude latitude' })
[docs] def simulated_error_troposphere_nadir( self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the error due to wet troposphere path delay to nadir.""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["time"]["shape"], name="simulated_error_troposphere_nadir", attrs={ 'long_name': 'Error due to wet troposphere path delay', 'units': 'm', 'scale_factor': 0.0001, 'coordinates': 'longitude latitude' })
[docs] def simulated_error_altimeter( self, array: np.ndarray) -> Tuple[Dict, xr.DataArray]: """Returns the properties of the variable describing the altimeter error""" return { '_FillValue': 2147483647, 'dtype': 'int32' }, xr.DataArray(data=array, dims=self.variables["time"]["shape"], name="simulated_error_altimeter", attrs={ 'long_name': 'Altimeter error', 'standard_name': '', 'units': 'm', 'scale_factor': 0.0001, 'coordinates': 'longitude latitude' })
[docs] def fill_variables(self, variables, shape) -> Iterator[Tuple[Dict, xr.DataArray]]: """Returns the properties of variables present in the official format of the SWOT product, but not calculated by this software.""" for item in self.variables: if item in variables: continue properties = self.variables[item] fill_value = self.fill_value(properties) data = np.full(tuple(shape[dim] for dim in properties["shape"]), fill_value, properties["dtype"]) variable = self._data_array(item, data) if variable is not None: yield variable return StopIteration
[docs]class Nadir: """Handle the nadir measurements dataset. Args: track (orbit_propagator.Pass): Properties of the half-orbit to write standalone (bool): True if this dataset is independent of the KaRIn dataset. """
[docs] def __init__(self, track: orbit_propagator.Pass, standalone: bool = True, product_type: Optional[str] = None): self.standalone = standalone self.product_spec = ProductSpecification(product_type) self.num_lines = track.time.size self.num_pixels = 1 self.encoding, self.data_vars = self.product_spec.time(track.time) self._data_array("lon_nadir", track.lon_nadir) self._data_array("lat_nadir", track.lat_nadir)
[docs] def _data_array(self, attr: str, data: np.ndarray, fill_value: Optional[np.ndarray] = None) -> None: """Get the properties of a data array to be inserted in the dataset. Args: attr (dict): Name of the method of the `ProductSpec` class defining the variable to be inserted. data (np.array): data to be recorded fill_value (np.array, optional): Fill value of the reference track vector to be inserted in the centre of the swath. """ # Is it necessary to insert a central pixel? if len(data.shape) == 2 and self.num_pixels % 2 == 1: # If the data is a grid, then the defined fill values are inserted # or a Nan vector if the fill values are undefined. if fill_value is None: fill_value = np.full((data.shape[0], ), np.nan, dtype=np.float64) middle = data.shape[1] // 2 data = np.c_[data[:, :middle], fill_value[:, np.newaxis], data[:, middle:]] variable = getattr(self.product_spec, attr)(data) if variable is not None: encoding, array = variable if self.standalone: array.name = array.name.replace("_nadir", "") self.encoding[array.name] = encoding self.data_vars.append(array)
[docs] def ssh(self, array: np.ndarray) -> None: """Sets the variable describing the SSH to nadir. Args: array (np.ndarray): Data to be recorded """ self._data_array("ssh_nadir", array)
[docs] def swh(self, array: np.ndarray) -> None: """Sets the variable describing the SSH to nadir. Args: array (np.ndarray): Data to be recorded """ self._data_array("swh_nadir", array)
[docs] def simulated_true_ssh(self, array: np.ndarray) -> None: """Sets the variable describing the SSH to nadir free of measurement errors. Args: array (np.ndarray): Data to be recorded """ self._data_array("simulated_true_ssh_nadir", array)
[docs] def update_noise_errors(self, noise_errors: Dict[str, np.ndarray]) -> None: """Sets the values of the simulated errors. Args: noise_errors (dict): Simulated errors to be recorded """ for k, v in noise_errors.items(): if v.ndim == 1: self._data_array(k, v)
[docs] def to_xarray(self, cycle_number: int, pass_number: int, complete_product: bool) -> xr.Dataset: """Converts this instance into a xarray dataset. Args: cycle_number (int): Cycle number. pass_number (int): Pass number. complete_product (bool): True if you want to obtain a complete SWOT dataset, i.e. containing all the variables of the official dataset, even those not calculated by the simulator. Returns: xr.Dataset: xarray dataset """ data_vars = dict((item.name, item) for item in self.data_vars) if "longitude" in data_vars: lng = data_vars["longitude"] lat = data_vars["latitude"] else: lng = data_vars["longitude_nadir"] lat = data_vars["latitude_nadir"] # Variables that are not calculated are filled in in order to have a # product compatible with the PDD SWOT. Longitude is used as a # template. if complete_product and len(lng.shape) == 2: shape = dict(zip(lng.dims, lng.shape)) shape["num_sides"] = 2 for encoding, array in self.product_spec.fill_variables( data_vars.keys(), shape): self.encoding[array.name] = encoding self.data_vars.append(array) # Variables must be written in the declaration order of the XML file unordered_vars = dict((item.name, item) for item in self.data_vars) data_vars = collections.OrderedDict() for item in self.product_spec.variables: if item in unordered_vars: data_vars[item] = unordered_vars[item] # Add variables that are not defined in the XML file for item in set(unordered_vars.keys()) - set(data_vars.keys()): data_vars[item] = unordered_vars[item] return xr.Dataset(data_vars=data_vars, attrs=global_attributes(self.product_spec.attributes, cycle_number, pass_number, self.data_vars[0].values, lng, lat))
[docs] def to_netcdf(self, cycle_number: int, pass_number: int, path: str, complete_product: bool) -> None: """Writes the dataset in a netCDF file. Args: cycle_number (int): Cycle number. pass_number (int): Pass number. complete_product (bool): True if you want to obtain a complete SWOT dataset, i.e. containing all the variables of the official dataset, even those not calculated by the simulator. """ LOGGER.info("write %s", path) dataset = self.to_xarray(cycle_number, pass_number, complete_product) to_netcdf(dataset, path, self.encoding, mode="w")
[docs]class Swath(Nadir): """Handle the KaRIn measurements dataset. Args: track (orbit_propagator.Pass): Properties of the half-orbit to write central_pixel (bool): Inserts or not in the swath, a central pixel divided in two by the reference ground track. """
[docs] def __init__(self, track: orbit_propagator.Pass, central_pixel: bool = False, product_type: Optional[str] = None) -> None: super().__init__(track, False, product_type) self.num_pixels = track.x_ac.size + int(central_pixel) _x_ac = np.full((track.time.size, track.x_ac.size), track.x_ac, dtype=track.x_ac.dtype) self._data_array("x_ac", _x_ac, np.full((track.time.size, ), 0, dtype=np.float64)) self._data_array("lon", track.lon, track.lon_nadir) self._data_array("lat", track.lat, track.lat_nadir)
[docs] def ssh(self, array: np.ndarray) -> None: """Sets the variable describing the KaRIn SSH. Args: array (np.ndarray): Data to be recorded """ self._data_array("ssh_karin", array)
[docs] def swh(self, array: np.ndarray) -> None: """Sets the variable describing the KaRIn SWH. Args: array (np.ndarray): Data to be recorded """ self._data_array("swh_karin", array)
[docs] def simulated_true_ssh(self, array: np.ndarray) -> None: """Sets the variable describing the KaRIn SSH free of measurement errors. Args: array (np.ndarray): Data to be recorded """ self._data_array("simulated_true_ssh_karin", array)
[docs] def update_noise_errors(self, noise_errors: Dict[str, np.ndarray]) -> None: """Sets the values of the simulated errors. Args: noise_errors (dict): Simulated errors to be recorded """ if "ssh_karin" in self.product_spec: for k, v in noise_errors.items(): if v.ndim == 2: self._data_array(k, v)