# Copyright (c) 2021 CNES/JPL
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Wet troposphere errors
----------------------
"""
from typing import Dict, List, Tuple
import logging
import numba as nb
import numba.typed
import numpy as np
import scipy.ndimage.filters
from .. import (
BASELINE,
CELERITY,
F_KA,
VOLUMETRIC_MEAN_RADIUS,
random_signal,
settings,
)
#: Logger of this module
LOGGER = logging.getLogger(__name__)
@nb.njit(cache=True, nogil=True)
def _meshgrid(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
nx = x.size
ny = y.size
mx = np.empty((ny, nx))
my = np.empty((ny, nx))
for idx in range(ny):
mx[idx, :] = x
for idx in range(nx):
my[:, idx] = y
return mx, my
@nb.njit(cache=True, nogil=True)
def _calculate_path_delay_lr(beam_positions: List[float], sigma: float,
radio_r: np.ndarray, radio_l: np.ndarray,
x_al: np.ndarray, x_ac_large: np.ndarray,
wt_large: np.ndarray):
beam_r = np.empty((x_al.shape[0], ))
beam_l = np.empty((x_al.shape[0], ))
# Find righ and leftacross track indices in the gaussian
# footprint of 2.*p.sigma
ind_r = x_ac_large + beam_positions[1]
indac_r = np.where((ind_r < 2 * sigma) & (ind_r > -2 * sigma))[0]
ind_l = x_ac_large + beam_positions[0]
indac_l = np.where((ind_l < 2 * sigma) & (ind_l > -2 * sigma))[0]
factor = 1 / (2 * np.pi * sigma**2)
for idx, xal in enumerate(x_al):
# Find along track indices in the gaussian footprint
# of 2.*p.sigma
delta_x_al = x_al - xal
indal = np.where((delta_x_al <= (2 * sigma))
& (delta_x_al > (-2 * sigma)))[0]
slice_al = slice(indal[0], indal[-1] + 1)
slice_acr = slice(indac_r[0], indac_r[-1] + 1)
slice_acl = slice(indac_l[0], indac_l[-1] + 1)
x, y = _meshgrid(x_ac_large[slice_acr], x_al[slice_al] - xal)
# Compute path delay on left and right gaussian footprint
g = factor * np.exp(-(x**2 + y**2) / (2 * sigma**2))
beam_r[idx] = np.sum(
g * wt_large[slice_al, slice_acr]) / np.sum(g) + radio_r[idx]
x, y = _meshgrid(x_ac_large[slice_acl], x_al[slice_al] - xal)
g = factor * np.exp(-(x**2 + y**2) / (2 * sigma**2))
beam_l[idx] = np.sum(
g * wt_large[slice_al, slice_acl]) / np.sum(g) + radio_l[idx]
return beam_r, beam_l
@nb.njit(cache=True, nogil=True)
def _calculate_path_delay(sigma: float, radio: np.ndarray, x_al: np.ndarray,
x_ac_large: np.ndarray, wt_large: np.ndarray):
beam = np.empty((x_al.shape[0], ))
# Find across track indices in the gaussian footprint of
# 2. * sigma
indac = np.where((x_ac_large < 2 * sigma) & (x_ac_large > -2 * sigma))[0]
factor = 1 / (2 * np.pi * sigma**2)
for idx, xal in enumerate(x_al):
delta_x_al = x_al[:] - xal
indal = np.where((delta_x_al <= (2 * sigma))
& (delta_x_al > (-2 * sigma)))[0]
slice_al = slice(indal[0], indal[-1] + 1)
slice_ac = slice(indac[0], indac[-1] + 1)
x, y = _meshgrid(x_ac_large[slice_ac], x_al[slice_al] - xal)
# Compute path delay on gaussian footprint
g = factor * np.exp(-(x**2 + y**2) / (2 * sigma**2))
beam[idx] = np.sum(
g * wt_large[slice_al, slice_ac]) / np.sum(g) + radio[idx]
return beam
[docs]class WetTroposphere:
"""Wet troposphere errors.
Args:
parameters (settings.Parameters): Simulation settings
"""
ALPHA = 10
LC_MAX = 500
F_MAX = 0.05
[docs] def __init__(self, parameters: settings.Parameters) -> None:
LOGGER.info("Initializing WetTroposphere")
# Store the generation parameters of the random signal.
self.beam_positions = parameters.beam_position
self.delta_ac = parameters.delta_ac
self.delta_al = parameters.delta_al
self.nbeam = parameters.nbeam
self.sigma = parameters.sigma
assert parameters.height is not None, 'Height is not set'
# TODO
self.conversion_factor = (
1 / (F_KA * 2 * np.pi / CELERITY * BASELINE) *
(1 + (parameters.height * 1e-3) / VOLUMETRIC_MEAN_RADIUS) * np.pi /
180 * 1e3)
# Define power spectrum of error in path delay due to wet tropo
freq = np.arange(1 / 3000, 1 / (2 * self.delta_al), 1 / 3000)
# Global mean wet tropo power spectrum in cm**2/(cycle/km)
# for L >= 100 km
pswt = 3.156 * 1e-05 * freq**(-8 / 3)
# Wet tropo power spectrum in cm**2/(cycle/km) for L < 100 km
mask = freq > 1e-2
pswt[mask] = 1.4875 * 1e-4 * freq[mask]**(-2.33)
self.pswt = pswt
self.freq = freq
len_repeat = 10000.0
self.fminx = 1 / len_repeat
self.ps2d, self.f = random_signal.gen_ps2d(freq,
pswt,
fminx=self.fminx,
fminy=1 / self.LC_MAX,
fmax=self.F_MAX,
alpha=self.ALPHA,
lf_extpl=True,
hf_extpl=True)
# Define radiometer error power spectrum for a beam
# High frequencies are cut to filter the associated error:
# during the reconstruction of the wet trop signal
psradio = 9.5 * 1e-5 * self.freq**-1.79
psradio[self.freq < 1e-3] = 9.5 * 1e-5 * (1e-3)**-1.79
mask = (self.freq > 0.0023) & (self.freq <= 0.0683)
psradio[mask] = 0.036 * self.freq[mask]**-0.814
psradio[self.freq > 0.0683] = 0.32
self.radio_r = random_signal.Signal1D(self.freq,
psradio,
fmin=1 / len_repeat,
fmax=1 / (2 * self.delta_al),
alpha=10,
rng=parameters.rng(),
hf_extpl=True,
lf_extpl=True)
self.radio_l = random_signal.Signal1D(self.freq,
psradio,
fmin=1 / len_repeat,
fmax=1 / (2 * self.delta_al),
alpha=10,
rng=parameters.rng(),
hf_extpl=True,
lf_extpl=True)
self.wt = random_signal.Signal2D(self.ps2d,
self.f,
fminx=self.fminx,
fminy=1 / self.LC_MAX,
fmax=self.F_MAX,
alpha=self.ALPHA,
rng=parameters.rng())
self.wt_large = random_signal.Signal2D(self.ps2d,
self.f,
fminx=self.fminx,
fminy=1 / self.LC_MAX,
fmax=self.F_MAX,
alpha=self.ALPHA,
rng=parameters.rng())
[docs] def _radiometer_error(self,
x_al: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
# Compute random coefficients (1D) for the radiometer error
# power spectrum for right and left beams
radio_r = self.radio_r(x_al) * 1e-2
radio_l = self.radio_l(x_al) * 1e-2
return radio_r, radio_l
[docs] def generate(self, x_al: np.ndarray,
x_ac: np.ndarray) -> Dict[str, np.ndarray]:
"""Generate wet troposphere errors.
Args:
x_al (numpy.ndarray): Along track distance
x_ac (numpy.ndarray): Across track distance
Returns:
dict: variable name and errors simulated.
"""
num_lines = x_al.shape[0]
num_pixels = x_ac.shape[0]
# Initialization of radiometer error in right and left beam
radio_r, radio_l = self._radiometer_error(x_al)
# Initialization of swath matrices and large swath matrices (which
# include wet tropo data around the nadir and outside the swath)
start_x = -2 * self.sigma / self.delta_ac + x_ac[0]
stop_x = (2 * self.sigma / self.delta_ac + x_ac[-1] + self.delta_ac)
# x_ac_large and wt_large are necessary to compute the gaussian
# footprint of a beam on the nadir or near the edge of the swath
x_ac_large = np.arange(start_x, stop_x, self.delta_ac)
naclarge = np.shape(x_ac_large)[0]
# Compute path delay error due to wet tropo and radiometer error
# using random coefficient initialized with power spectrums
wt = self.wt(x_al, x_ac)
wt = wt.T * 1e-2
wt_large = self.wt_large(x_al, x_ac_large)
wt_large = wt_large.T * 1e-2
# Compute Residual path delay error after a 1-beam radiometer
# correction
if self.nbeam == 1:
beam = _calculate_path_delay(self.sigma, radio_l, x_al, x_ac_large,
wt_large)
beam = scipy.ndimage.filters.gaussian_filter(
beam, 30. / self.delta_al)
beam2d = np.vstack(num_pixels * (beam, )).T
# Compute residual path delay
wet_tropo = wt - beam2d
wet_tropo_nadir = wt_large[:, naclarge // 2] - beam
# Compute Residual path delay error after a 2-beams radiometer
# correction
elif self.nbeam == 2:
beam_r, beam_l = _calculate_path_delay_lr(
numba.typed.List(self.beam_positions), self.sigma, radio_r,
radio_l, x_al, x_ac_large, wt_large)
# Filtering beam signal to cut frequencies higher than 125 km
beam_r = scipy.ndimage.filters.gaussian_filter(
beam_r, 30 / self.delta_al)
beam_l = scipy.ndimage.filters.gaussian_filter(
beam_l, 30 / self.delta_al)
# Compute residual path delay (linear combination of left
# and right path delay)
polyfit = np.polynomial.polynomial.polyfit
pol = polyfit([self.beam_positions[0], self.beam_positions[1]],
[beam_l, beam_r], 1)
beam = (np.array(num_pixels * [pol[0]]).T +
np.array(num_lines * [x_ac]) *
np.array(num_pixels * [pol[1]]).T)
wet_tropo = wt - beam
wet_tropo_nadir = wt_large[:, naclarge //
2] - beam[:, num_pixels // 2]
else:
raise ValueError("nbeam must be in [1, 2]")
# wt_nadir = wt_large[:, naclarge // 2]
return {
"simulated_error_troposphere": wet_tropo,
"simulated_error_troposphere_nadir": wet_tropo_nadir
}