Source code for swot_simulator.random_signal

# Copyright (c) 2021 CNES/JPL
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Random signal generation utilities
----------------------------------
"""
from typing import Optional, Tuple
import warnings

# import dask.array as da
import numba as nb
import numpy as np
import scipy.interpolate
import xarray as xr

try:
    import mkl_fft
    IFFT = mkl_fft.ifft
    IFFT2 = mkl_fft.ifft2
except ImportError:
    IFFT = np.fft.ifft
    IFFT2 = np.fft.ifft2


[docs]def read_file_instr(file_instr: str, delta_al: float) -> xr.Dataset: """Retrieve power spectrum from instrumental noise file provided by.""" dataset = xr.load_dataset(file_instr) # Set spatial frequency to spatial coordinate dataset = dataset.swap_dims(dict(nfreq="spatial_frequency")) # Extract spatial frequency relevant to our sampling # (Cut frequencies larger than Nyquist frequency) cut_min = 1 / (2 * delta_al) return dataset.where(dataset.spatial_frequency <= cut_min, drop=True)
[docs]def read_file_karin(path: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Retrieve power spectrum from instrumental noise file provided by.""" with xr.open_dataset(path) as dataset: height_sdt = dataset['height_sdt'].data cross_track = dataset['cross_track'].data swh = dataset['SWH'].data return height_sdt, cross_track, swh
@nb.njit(cache=True, nogil=True) def _interpolate_file_karin(swh_in: np.ndarray, x_ac_in: np.ndarray, height_sdt: np.ndarray, cross_track: np.ndarray, swh: np.ndarray) -> Tuple[float, np.ndarray]: """Interpolates the standard deviation of KaRIN instrumental noise as a function of SWH and across track distance.""" warning = 0 hsdt = np.zeros(swh_in.shape, dtype=np.float64) for jx in range(swh_in.shape[1]): xacj = abs(x_ac_in[jx]) indice_ac = np.argmin(np.abs(cross_track - xacj)) for ix in range(swh_in.shape[0]): threshold = swh_in[ix, jx] if np.isnan(threshold): threshold = 2.0 indices = np.argmin(np.abs(swh - threshold)) if swh[indices] > threshold: indices -= 1 if swh.max() <= threshold: hsdt[ix, jx] = height_sdt[-1, indice_ac] warning = threshold else: rswh = threshold - swh[indices] hsdt[ix, jx] = height_sdt[indices, indice_ac] * ( 1 - rswh) + rswh * height_sdt[indices + 1, indice_ac] return warning, hsdt
[docs]def interpolate_file_karin(swh_in: np.ndarray, x_ac_in: np.ndarray, height_sdt: np.ndarray, cross_track: np.ndarray, swh: np.ndarray) -> np.ndarray: """Interpolates the standard deviation of KaRIN instrumental noise as a function of SWH and across track distance.""" if len(swh_in.shape) == 1: swh_in = np.expand_dims(swh_in, axis=0) warning, hsdt = _interpolate_file_karin(swh_in, x_ac_in, height_sdt, cross_track, swh) if warning: warnings.warn( f'swh={warning} is greater than the maximum value, ' f'therefore swh is set to the file maximum ' 'value', RuntimeWarning) return hsdt
[docs]def gen_psd_1d(fi: np.ndarray, psi: np.ndarray, rng: np.random.Generator, fmin: Optional[float] = None, fmax: Optional[float] = None, alpha: int = 10, lf_extpl: bool = False, hf_extpl: bool = False) -> Tuple[np.ndarray, float]: """Generate 1d random signal using Fourier coefficient.""" # Make sure fi, PSi does not contain the zero frequency: psi = psi[fi > 0] fi = fi[fi > 0] # Adjust fmin and fmax to fi bounds if not specified. Values are bounded # with respect to the frequencies of the processed spectrum. fmin = fmin or fi[0] fmax = fmax or fi[-1] # Go alpha times further in frequency to avoid interpolation aliasing. fmaxr = alpha * fmax # Interpolation of the non-zero part of the spectrum f = np.arange(fmin, fmaxr + fmin, fmin) mask = psi > 0 ps = np.exp(np.interp(np.log(f), np.log(fi[mask]), np.log(psi[mask]))) # lf_extpl=True prolongates the PSi as a plateau below min(fi). # Otherwise, we consider zeros values. same for hf ps[f < fi[0]] = psi[0] if lf_extpl else 0 ps[f > fi[-1]] = psi[-1] if hf_extpl else 0 ps[f > fmax] = 0 # Detect the sections (if any) where PSi==0 and apply it to PS mask = np.interp(f, fi, psi) ps[mask == 0] = 0 f_size = f.size phase = np.empty((2 * f_size + 1)) phase[1:(f_size + 1)] = rng.random(f_size) * 2 * np.pi phase[0] = 0 phase[-f_size:] = -phase[1:(f_size + 1)][::-1] fft1a = np.concatenate((np.array([0]), 0.5 * ps, 0.5 * ps[::-1]), axis=0) fft1a = np.sqrt(fft1a) * np.exp(1j * phase) / np.sqrt(fmin) yg = 2 * fmaxr * np.real(IFFT(fft1a)) return yg, fmaxr
def _gen_signal_1d(fi: np.ndarray, psi: np.ndarray, rng: np.random.Generator, fmin: Optional[float] = None, fmax: Optional[float] = None, alpha: int = 10, lf_extpl: bool = False, hf_extpl: bool = False) -> Tuple[np.ndarray, np.ndarray]: """Generate 1d random signal using Fourier coefficient.""" yg, fmaxr = gen_psd_1d(fi, psi, rng, fmin, fmax, alpha, lf_extpl, hf_extpl) xg = np.linspace(0, 0.5 / fmaxr * yg.shape[0], yg.shape[0]) return xg, yg
[docs]class Signal1D:
[docs] def __init__(self, fi: np.ndarray, psi: np.ndarray, rng: np.random.Generator, fmin: Optional[float] = None, fmax: Optional[float] = None, alpha: int = 10, lf_extpl: bool = False, hf_extpl: bool = False): xg_lf, yg_lf = _gen_signal_1d(fi, psi, rng, 1 / 100000000, 1 / 1000000, alpha, True, hf_extpl) xg_hf, yg_hf = _gen_signal_1d(fi, psi, rng, fmin, fmax, alpha, lf_extpl, hf_extpl) self.xg_lf_max = xg_lf.max() self.xg_hf_max = xg_hf.max() self.xg_lf = xg_lf self.yg_lf = yg_lf self.xg_hf = xg_hf self.yg_hf = yg_hf
[docs] def __call__(self, x: np.ndarray) -> np.ndarray: """Returns the 1D random signal at the specified x.""" lf = np.interp(np.mod(x, self.xg_lf_max), self.xg_lf, self.yg_lf) hf = np.interp(np.mod(x, self.xg_hf_max), self.xg_hf, self.yg_hf) return lf + hf
@nb.njit("(float64[:, ::1])" "(float64[::1], float64[:, ::1], float64[::1], float64, float64)", cache=True, nogil=True) def _calculate_ps2d(f: np.ndarray, f2: np.ndarray, ps1d: np.ndarray, dfx: np.ndarray, dfy: np.ndarray) -> np.ndarray: result = np.zeros(f2.shape) view = result.ravel() dfx_2 = dfx * 0.5 dfx_y = dfx * dfy f2 = f2.ravel() for idx in range(-1, -f.size - 1, -1): item = f[idx] mask = (f2 >= (item - dfx_2)) & (f2 < (item + dfx_2)) amount = np.sum(result[:, idx]) * dfx_y miss = ps1d[idx] * dfx - amount view[mask] = 0 if miss <= 0 else miss * 0.5 / dfx_y return result @nb.njit("(float64[:, ::1])" "(float64[:, ::1], float64[::1], float64[::1], float64, float64)", cache=True, nogil=True) def _calculate_signal(rectangle, x, y, xgmax, ygmax): result = np.zeros((len(y), len(x))) xn = (x.max() - x[0]) // xgmax yn = (y.max() - y[0]) // ygmax dx = x - x[0] dy = y - y[0] for ix_n in range(int(xn + 1)): ix0 = np.where((dx >= (ix_n * xgmax)) & (dx < ((ix_n + 1) * xgmax)))[0] for iy_n in range(int(yn + 1)): iy0 = np.where((dy >= (iy_n * ygmax)) & (dy < ((iy_n + 1) * ygmax)))[0] result[iy0[0]:iy0[-1] + 1, ix0[0]:ix0[-1] + 1] = rectangle[:len(iy0), :len(ix0)] return result
[docs]def gen_ps2d(fi: np.ndarray, psi: np.ndarray, fminx: float, fminy: float, fmax: float, alpha: int = 10, lf_extpl: bool = False, hf_extpl: bool = False) -> Tuple[np.ndarray, np.ndarray]: """TODO(lgaultier)""" if fminy < fminx: fmin, fminy = fminy, fminx else: fmin = fminx # Go alpha times further in frequency to avoid interpolation aliasing. fmaxr = alpha * fmax # Make sure fi, PSi does not contain the zero frequency: psi = psi[fi > 0] fi = fi[fi > 0] # Interpolation function for the non-zero part of the spectrum f = np.arange(fmin, fmaxr + fmin, fmin) ps = np.exp(np.interp(np.log(f), np.log(fi[psi > 0]), np.log(psi[psi > 0]))) # lf_extpl=True prolongates the PSi as a plateau below min(fi). # Otherwise, we consider zeros values. same for hf ps[f < fi[0]] = psi[0] if lf_extpl else 0 ps[f > fi[-1]] = psi[-1] if hf_extpl else 0 ps[f > fmax] = 0 # Detect the sections (if any) where PSi==0 and apply it to PS psmask = np.interp(f, fi, psi) ps[psmask == 0] = 0 ps1d = ps # Build the 2D PSD following the given 1D PSD fx = np.concatenate(([0], f)) fy = np.concatenate(([0], np.arange(fminy, fmaxr + fminy, fminy))) fx2, fy2 = np.meshgrid(fx, fy) f2 = np.sqrt((fx2**2 + fy2**2)) dfx = fmin dfy = fminy ps2d = _calculate_ps2d(f, f2, ps1d, dfx, dfy) ps2d[f2 > fmax] = 0 return ps2d, f
[docs]class Signal2D:
[docs] def __init__(self, ps2d: np.ndarray, f: np.ndarray, rng: np.random.Generator, fminx: float, fminy: float, fmax: float, alpha: int = 10) -> None: revert = fminy < fminx if revert: fmin, fminy = fminy, fminx else: fmin = fminx # Go alpha times further in frequency to avoid interpolation aliasing. fmaxr = alpha * fmax # Build the 2D PSD following the given 1D PSD fx = np.concatenate(([0], f)) fy = np.concatenate(([0], np.arange(fminy, fmaxr + fminy, fminy))) dfx, dfy = fmin, fminy phase = rng.random((2 * len(fy) - 1, len(fx))) * 2 * np.pi phase[0, 0] = 0. phase[-len(fy) + 1:, 0] = -phase[1:len(fy), 0][::-1] fft2a = np.concatenate((0.25 * ps2d, 0.25 * ps2d[1:, :][::-1, :]), axis=0) fft2a = np.sqrt(fft2a) * np.exp(1j * phase) / np.sqrt((dfx * dfy)) fft2 = np.zeros((2 * len(fy) - 1, 2 * len(fx) - 1), dtype=complex) fft2[:, :len(fx)] = fft2a fft2[1:, -len(fx) + 1:] = fft2a[1:, 1:].conj()[::-1, ::-1] fft2[0, -len(fx) + 1:] = fft2a[0, 1:].conj()[::-1] sg = (4 * fy[-1] * fx[-1]) * np.real(IFFT2(fft2)) xg = np.linspace(0, 1 / fmin, sg.shape[1]) yg = np.linspace(0, 1 / fminy, sg.shape[0]) self.xgmax = xg.max() self.ygmax = yg.max() self.reverse = revert self.xg = xg self.yg = yg self.sg = sg
[docs] def __call__(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: if self.reverse: x, y = y, x yl = y - y[0] yl = yl[yl < self.ygmax] xl = x - x[0] xl = xl[xl < self.xgmax] rectangle = np.ascontiguousarray( scipy.interpolate.interp2d(self.xg, self.yg, self.sg)(xl, yl)) signal = _calculate_signal(rectangle, x, y, self.xgmax, self.ygmax) return signal.transpose() if self.reverse else signal