# Copyright (c) 2021 CNES/JPL
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Generate instrumental errors
----------------------------
"""
from typing import Dict, Union
import dask.distributed
import numpy as np
from . import (
Altimeter,
BaselineDilation,
CorrectedRollPhase,
Karin,
Orbital,
RollPhase,
Timing,
WetTroposphere,
)
from .. import random_signal, settings
[docs]class Generator:
"""Instrumental error generator.
Args:
parameters (settings.Parameters): Simulation settings
first_date (numpy.datetime64): Date of the first simulated
measurement.
orbit_duration (numpy.timedelta64, optional): Orbit duration.
"""
[docs] def __init__(self, parameters: settings.Parameters,
first_date: np.datetime64, orbit_duration: np.timedelta64):
#: The list of user-defined error generators
self.generators = []
assert parameters.error_spectrum is not None
error_spectrum = random_signal.read_file_instr(
parameters.error_spectrum, parameters.delta_al)
for item in parameters.noise:
if item == Altimeter.__name__:
self.generators.append(Altimeter(parameters))
elif item == BaselineDilation.__name__:
self.generators.append(
BaselineDilation(parameters,
error_spectrum['dilationPSD'].data,
error_spectrum['spatial_frequency'].data))
elif item == CorrectedRollPhase.__name__:
self.generators.append(
CorrectedRollPhase(parameters, first_date))
elif item == Karin.__name__:
self.generators.append(Karin(parameters))
elif item == Orbital.__name__:
self.generators.append(Orbital(parameters, orbit_duration))
elif item == RollPhase.__name__:
self.generators.append(
RollPhase(parameters, error_spectrum['rollPSD'].data,
error_spectrum['gyroPSD'].data,
error_spectrum['phasePSD'].data,
error_spectrum['spatial_frequency'].data))
elif item == Timing.__name__:
self.generators.append(
Timing(parameters, error_spectrum['timingPSD'].data,
error_spectrum['spatial_frequency'].data))
elif item == WetTroposphere.__name__:
self.generators.append(WetTroposphere(parameters))
else:
# A new error generation class has been implemented but it is
# not handled by this object.
raise ValueError(f"unknown error generation class: {item}")
[docs] def generate(self, cycle_number: int, pass_number: int,
curvilinear_distance: float, time: np.ndarray,
x_al: np.ndarray, x_ac: np.ndarray,
swh: Union[np.ndarray, float]) -> Dict[str, np.ndarray]:
"""Generate errors.
Args:
cycle_number (int): Cycle number.
pass_number (int): Pass number.
curvilinear_distance (float): Curvilinear distance covered by the
satellite during a complete cycle.
time (numpy.ndarray): Date of measurements.
x_al (numpy.ndarray): Along track distance.
x_ac (numpy.ndarray): Across track distance.
swh (numpy.ndarray, float): Significant wave height. Used to
modulate instrumental noise as a function of sea state.
Returns:
dict: Associative array between error variables and simulated
values.
"""
# The variable "x_al" contains the distance a along track from the
# beginning of the cycle. In order not to reproduce the same error from
# one cycle to another, we transform it into the distance covered since
# the first cycle.
x_al += curvilinear_distance * (cycle_number - 1)
result = {}
if not self.generators or x_al.shape[0] == 0:
return result
def wrapped(generator, *args):
"""Wrapper to avoid parallelization issues."""
return generator.generate(*args)
futures = []
with dask.distributed.worker_client() as client:
for item in self.generators:
scatter = client.scatter(item)
if isinstance(item, Altimeter):
futures.append(client.submit(wrapped, scatter, x_al))
elif isinstance(item, BaselineDilation):
futures.append(client.submit(wrapped, scatter, x_al, x_ac))
elif isinstance(item, CorrectedRollPhase):
futures.append(client.submit(wrapped, scatter, time, x_ac))
elif isinstance(item, Karin):
if isinstance(swh, float):
swh = np.full((x_ac.size, ), swh)
futures.append(
client.submit(item.generate,
cycle_number * 10000 + pass_number, x_al,
x_ac, swh))
elif isinstance(item, Orbital):
futures.append(client.submit(wrapped, scatter, time, x_ac))
elif isinstance(item, RollPhase):
futures.append(client.submit(wrapped, scatter, x_al, x_ac))
elif isinstance(item, Timing):
futures.append(client.submit(wrapped, scatter, x_al, x_ac))
elif isinstance(item, WetTroposphere):
futures.append(client.submit(wrapped, scatter, x_al, x_ac))
for future in dask.distributed.as_completed(futures):
result.update(future.result())
return result