Source code for swot_simulator.orbit_propagator

# Copyright (c) 2021 CNES/JPL
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Orbit Propagator
----------------
"""
from typing import Dict, Iterator, NamedTuple, Optional, TextIO, Tuple
import logging
import warnings

import numpy as np
import pyinterp

from . import VOLUMETRIC_MEAN_RADIUS, math, settings

LOGGER = logging.getLogger(__name__)


[docs]def load_ephemeris( stream: TextIO, cols: Optional[Tuple[int, int, int]] = None) -> Tuple[Dict[str, float], Tuple]: """Loads a tabular file describing a satellite orbit. Args: stream (typing.TextIO): Stream to the text file to be processed. cols (tuple): A tuple of 3 elements describing the columns representing the longitude, latitude and number of seconds elapsed since the beginning of the orbit. Returns: tuple: A tuple of 2 elements containing the properties of the orbit and the positions of the satellite. """ if cols is None: cols = (1, 2, 0) LOGGER.info("loading ephemeris: %s", settings.pretty_print(stream.name)) comments = [] data = [] for item in stream: item = item.strip() if item.startswith("#"): comments.append(item[1:]) else: data.append(list(float(value) for value in item.split())) data = np.asarray(data, dtype=np.float64) def to_dict(comments) -> Dict[str, float]: """Returns a dictionary describing the parameters of the orbit.""" result = dict() for item in comments: key, value = item.split("=") result[key.strip()] = float(value) return result return to_dict(comments), tuple(data[:, item] for item in cols)
[docs]def rearrange_orbit( cycle_duration: float, lon: np.ndarray, lat: np.ndarray, time: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Rearrange orbit starting from pass 1. Detect the beginning of pass 1 in the ephemeris. By definition, it is the first passage at southernmost latitude. Args: cycle_duration (float): Cycle time in seconds. lon (numpy.ndarray): Longitudes (in degrees) lat (numpy.ndarray): Latitudes (in degrees) time (np.ndarray): Date of the positions (in seconds). Returns: tuple: lon, lat, and time rearranged. """ dy = np.roll(lat, 1) - lat indexes = np.where((dy < 0) & (np.roll(dy, 1) >= 0))[0] # type: ignore # Shift coordinates, so that the first point of the orbit is the beginning # of pass 1 shift = indexes[-1] lon = np.hstack([lon[shift:], lon[:shift]]) lat = np.hstack([lat[shift:], lat[:shift]]) time = np.hstack([time[shift:], time[:shift]]) time = (time - time[0]) % cycle_duration if np.any(time < 0): warnings.warn('there are negative times in your orbit', RuntimeWarning) return lon, lat, time
[docs]def calculate_pass_time(lat: np.ndarray, time: np.ndarray) -> np.ndarray: """Compute the initial time of each pass. Args: lat (numpy.ndarray): Latitudes (in degrees) time (np.ndarray): Date of the latitudes (in seconds). Returns: numpy.ndarray: Start date of half-orbits. """ dy = np.roll(lat, 1) - lat indexes = np.where(((dy < 0) & (np.roll(dy, 1) >= 0)) # type: ignore | ((dy > 0) # type: ignore & (np.roll(dy, 1) <= 0)))[0] # type: ignore # The duration of the first pass is zero. indexes[0] = 0 return time[indexes]
[docs]def select_box( box: math.Box, lon: np.ndarray, lat: np.ndarray, time: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Selects the orbit in the defined box. Args: box (math.Box): Geographical selection lon (numpy.ndarray): Longitudes (in degrees) lat (numpy.ndarray): Latitudes (in degrees) time (np.ndarray): Date of the positions (in seconds). Returns: tuple: the selected positions and associated dates. """ mask = box.within(lon, lat) x_al = math.curvilinear_distance(lon, lat, VOLUMETRIC_MEAN_RADIUS) return lon[mask], lat[mask], time[mask], x_al[mask]
[docs]class Orbit: """Properties of one orbit. Args: height (float): Satellite height (in m) lat (numpy.ndarray): Latitudes (in degrees) lon (numpy.ndarray): Longitudes (in degrees) pass_time (np.ndarray): Start date of half-orbits. time (np.ndarray): Date of the positions (in seconds). x_al (np.ndarray): Along track distance curvilinear_distance (float): Distance covered by the satellite during a complete cycle. shift_time (float): Time shift to be applied. temporal_overlap (float, optional): Simulate products overlapping by this amount of time, in seconds, with the previous and next products. """
[docs] def __init__(self, height: float, lat: np.ndarray, lon: np.ndarray, pass_time: np.ndarray, time: np.ndarray, x_al: np.ndarray, curvilinear_distance: float, shift_time: Optional[np.timedelta64], temporal_overlap: Optional[float]): self.height = height self.lat = lat self.lon = lon self.pass_time = pass_time self.shift_time = shift_time self.time = time self.x_al = x_al self.curvilinear_distance = curvilinear_distance self.temporal_overlap = Orbit._temporal_overlap(time, temporal_overlap)
[docs] @staticmethod def _temporal_overlap(time: np.ndarray, temporal_overlap: Optional[float]) -> int: """Converts the temporal overlap to an integer number of samples. Args: time (numpy.ndarray): Date of the positions (in seconds). temporal_overlap (float, optional): Simulate products overlapping by this amount of time, in seconds, with the previous and next products. Returns: int: temporal overlap in number of samples """ if temporal_overlap is not None: dt = np.mean( np.diff(time)).astype("timedelta64[ms]").view("int64") * 1e-3 return round(temporal_overlap / dt) return 0
[docs] def cycle_duration(self) -> np.timedelta64: """Get the cycle duration.""" return self.time[-1]
[docs] def orbit_duration(self) -> np.timedelta64: """Get the orbit duration.""" duration = self.cycle_duration().astype( "timedelta64[us]") / np.timedelta64( int(self.passes_per_cycle() // 2), 'us') return np.timedelta64(int(duration), "us")
[docs] def passes_per_cycle(self) -> int: """Get the number of passes per cycle.""" return len(self.pass_time)
[docs] def pass_shift(self, number: int) -> np.timedelta64: """Get the time offset between the first measurement and the last measurement of the track. Args: number (int): track number (must be in [1, passes_per_cycle()]) Returns: numpy.datetime64: time offset """ passes_per_cycle = self.passes_per_cycle() if number < 1 or number > passes_per_cycle: raise ValueError(f"number must be in [1, {passes_per_cycle}]") if number == passes_per_cycle: indexes = np.where(self.time >= self.pass_time[-1])[0] else: indexes = np.where((self.time >= self.pass_time[number - 1]) & (self.time < self.pass_time[number]))[0] return np.sum(np.diff(self.time[indexes]))
[docs] def pass_duration(self, number: int) -> np.timedelta64: """Get the duration of a given pass. Args: number (int): track number (must be in [1, passes_per_cycle()]) Returns: numpy.datetime64: track duration """ passes_per_cycle = self.passes_per_cycle() if number < 1 or number > passes_per_cycle: raise ValueError(f"number must be in [1, {passes_per_cycle}]") if number == passes_per_cycle: return self.time[-1] - self.pass_time[-1] + self.time[ 1] - self.time[0] return self.pass_time[number] - self.pass_time[number - 1]
[docs] def decode_absolute_pass_number(self, number: int) -> Tuple[int, int]: """Calculate the cycle and pass number from a given absolute pass number. Args: number (int): absolute pass number Returns: tuple: cycle and pass number """ number -= 1 return (int(number / self.passes_per_cycle()) + 1, (number % self.passes_per_cycle()) + 1)
[docs] def encode_absolute_pass_number(self, cycle_number: int, pass_number: int) -> int: """Calculate the absolute pass number for a given half-orbit. Args: cycle_number (int): Cycle number pass_number (int): Pass number Returns: int: Absolute pass number """ passes_per_cycle = self.passes_per_cycle() if not 1 <= pass_number <= passes_per_cycle: raise ValueError(f"pass_number must be in [1, {passes_per_cycle}") return (cycle_number - 1) * self.passes_per_cycle() + pass_number
[docs] def delta_t(self) -> np.timedelta64: """Returns the average time difference between two measurements. Returns: int: average time difference """ return np.diff(self.time).mean()
[docs] def iterate( self, first_date: Optional[np.datetime64] = None, last_date: Optional[np.datetime64] = None, absolute_pass_number: int = 1 ) -> Iterator[Tuple[int, int, np.datetime64]]: """Obtain all half-orbits within the defined time interval. Args: first_date (numpy.datetime64): First date of the period to be considered. last_date (numpy.datetime64): Last date of the period to be considered. absolute_pass_number (int, optional): Absolute number of the first pass to be returned. Returns: iterator: An iterator for all passes in the interval pointing to the cycle number, pass number and start date of the half-orbit. """ date = first_date or np.datetime64("now") last_date = last_date or date + self.cycle_duration() while date <= last_date: cycle_number, pass_number = self.decode_absolute_pass_number( absolute_pass_number) yield cycle_number, pass_number, date # Shift the date of the duration of the generated pass date += self.pass_duration(pass_number) # Update of the number of the next pass to be generated absolute_pass_number += 1 return StopIteration
[docs]class EquatorCoordinates(NamedTuple): """Coordinates of the satellite at the equator.""" #: Longitude longitude: float #: Product dataset name time: np.datetime64
[docs]class Pass: """Handle one pass. Args: lat_nadir (np.ndarray): Latitudes at nadir lat (np.ndarray): Latitudes of the swath lon_nadir (np.ndarray): Longitudes at nadir lon (np.ndarray): Longitudes of the swath time (np.ndarray): Time of measurements x_ac (np.ndarray): Across track distance. x_al (np.ndarray): Along track distance. equator_coordinate (EquatorCoordinates): Coordinates of the satellite at the equator. offset (int): number of measurements to be skipped at the beginning and at the end of the pass to ignore the temporal overlap between consecutive passes. requirement_bounds: Limits of swath requirements. Measurements outside the span will be set to NaN. """
[docs] def __init__(self, lat_nadir: np.ndarray, lat: np.ndarray, lon_nadir: np.ndarray, lon: np.ndarray, time: np.ndarray, x_ac: np.ndarray, x_al: np.ndarray, equator_coordinates: EquatorCoordinates, offset: int = 0, requirement_bounds: Optional[Tuple[float, float]] = None): self._date = None self.lat = lat self.lat_nadir = lat_nadir self.lon = lon self.lon_nadir = lon_nadir self.offset = offset self.requirement_bounds = requirement_bounds self.timedelta = time self.x_ac = x_ac self.x_al = x_al self._equator_coordinates = equator_coordinates
@property def time(self): """Time of positions.""" assert self._date is not None return self._date + (self.timedelta - self.timedelta[self.offset]) @property def time_eq(self): """Time at the equator.""" assert self._date is not None return self._date + (self._equator_coordinates.time - self.timedelta[self.offset]) @property def lon_eq(self): """Equator longitude.""" return self._equator_coordinates.longitude
[docs] def set_simulated_date(self, date: np.datetime64) -> None: """Set time of positions.""" assert self._date is None self._date = date
[docs] def mask(self) -> np.ndarray: """Obtain a mask to set NaN values outside the mission requirements.""" if self.requirement_bounds is not None: valid = np.full_like(self.x_ac, np.nan) valid[(np.abs(self.x_ac) >= self.requirement_bounds[0]) & (np.abs(self.x_ac) <= self.requirement_bounds[1])] = 1 along_track = np.full(self.lon_nadir.shape, 1, dtype=np.float64) return along_track[:, np.newaxis] * valid return np.full(self.lon.shape, 1, dtype=np.float64)
[docs]def calculate_orbit(parameters: settings.Parameters, ephemeris: TextIO) -> Orbit: """Computes the orbit nadir on a subdomain. The path of the satellite is given by the orbit file and the subdomain corresponds to the one in the model. Note that a subdomain can be manually added in the parameters file. Args: parameters (settings.Parameters): Simulation parameters. ephemeris (typing.TextIO): Stream to the ephemeris CSV file to be processed. Returns: Orbit: The orbit's loaded into memory. """ properties, (lon, lat, time) = load_ephemeris(ephemeris, cols=parameters.ephemeris_cols) if np.any(lon > 360) or np.any(np.abs(lat) > 90): raise RuntimeError("The definition of ephemeris is incorrect. Is the " "'ephemeris_cols' parameter well defined?") # Put longitude in [-180, 180[ lon = math.normalize_longitude(lon) for key, value in properties.items(): if key == 'cycle_duration': parameters.cycle_duration = value elif key == 'height': parameters.height = value else: raise RuntimeError(f"The {key!r} parameter defined in the " "ephemeris is not handled.") if parameters.cycle_duration is None: raise RuntimeError("The cycle duration is not defined either in the " "ephemeris file or in the simulator parameters.") if parameters.height is None: raise RuntimeError( "The satellite altitude is not defined either in " "the ephemeris file or in the simulator parameters.") wgs = pyinterp.geodetic.Coordinates() # If orbit is at low resolution, interpolate the orbit provided if np.mean(np.diff(time)) > 0.5: time_hr = np.arange(time[0], time[-1], 0.5, dtype=time.dtype) lon, lat = pyinterp.orbit.interpolate(lon, lat, time, time_hr, wgs=wgs, half_window_size=50) time = time_hr # Cut orbit if more than an orbit cycle is provided indexes = np.where(time < parameters.cycle_duration * 86400)[0] lon = lon[indexes] lat = lat[indexes] time = time[indexes] # Get cycle period. cycle_duration = time[-1] + time[1] - time[0] # shift time if the user needs to shift the time of the orbit if parameters.shift_time is not None: indexes = np.where(time >= parameters.shift_time)[0] lon = np.hstack([lon[indexes[0]:], lon[:indexes[0]]]) lat = np.hstack([lat[indexes[0]:], lat[:indexes[0]]]) del indexes if parameters.shift_lon is not None: lon = math.normalize_longitude(lon + parameters.shift_lon) # Rearrange orbit starting from pass 1 lon, lat, time = rearrange_orbit(cycle_duration, lon, lat, time) # Calculates the along track distance distance = math.curvilinear_distance(lon, lat, VOLUMETRIC_MEAN_RADIUS) # Interpolate the final orbit according the given along track resolution x_al = np.arange(distance[0], distance[-2], parameters.delta_al, dtype=distance.dtype) # In order to avoid any problem of calculation of dates on real numbers, # we manipulate dates encoded on integers from this point. time = (np.interp(x_al, distance[:-1], time[:-1]) * 1e6).astype("timedelta64[us]") lon, lat = pyinterp.orbit.interpolate(lon, lat, distance, x_al, wgs=wgs, half_window_size=10) return Orbit(parameters.height, lat, lon, np.sort(calculate_pass_time(lat, time)), time, x_al, distance[-1], parameters.shift_time, parameters.temporal_overlap)
[docs]def equator_properties(lon_nadir: np.ndarray, lat_nadir: np.ndarray, time: np.ndarray) -> EquatorCoordinates: """Calculate the position of the satellite at the equator.""" # Search the nearest point to the equator i1 = (np.abs(lat_nadir)).argmin() i0 = i1 - 1 if lat_nadir[i0] * lat_nadir[i1] > 0: i0, i1 = i1, i1 + 1 lon1 = lon_nadir[i0:i1 + 1] lat1 = lat_nadir[i0:i1 + 1] time = time[i0:i1 + 1] # Calculate the intersection point p11, p12 = np.stack(math.spher2cart(lon1, lat1)).T p21, p22 = np.stack( math.spher2cart(lon1, np.zeros((2, ), dtype=np.float64))).T # Calculate normal of the normal vectors for arc 1 and 2 l = np.cross(np.cross(p11, p12), np.cross(p21, p22)) # Calculate one of the solution of the intersection point (the other is -i1) i1 = l / np.linalg.norm(l) lon_eq, _ = math.cart2spher(i1[:1], i1[1:2], i1[2:]) if not lon1.min() <= lon_eq <= lon1.max(): i1 = -i1 lon_eq, _ = math.cart2spher(i1[:1], i1[1:2], i1[2:]) # Calculate the distance along the track lon1 = np.insert(lon1, 1, lon_eq) lat1 = np.insert(lat1, 1, 0) x_al = math.curvilinear_distance(lon1, lat1, VOLUMETRIC_MEAN_RADIUS) # Pop the along track distance at the equator x_eq = x_al[1] x_al = np.delete(x_al, 1) # Finally interpolate the time of the equator time_eq = np.interp(x_eq, x_al, time.astype("int64")).astype(time.dtype) return EquatorCoordinates(lon_eq[0], time_eq)
[docs]def calculate_pass(pass_number: int, orbit: Orbit, parameters: settings.Parameters) -> Optional[Pass]: """Get the properties of an half-orbit. Args: pass_number (int): Pass number orbit (Orbit): Orbit describing the pass to be calculated. parameters (settings.Parameters): Simulation parameters. Returns: Pass, optional: The properties of the pass or None, if the pas is missing in the selected area. """ index = pass_number - 1 # Selected indexes corresponding to the current pass if index == len(orbit.pass_time) - 1: indexes = np.where(orbit.time >= orbit.pass_time[-1])[0] else: indexes = np.where((orbit.time >= orbit.pass_time[index]) & (orbit.time < orbit.pass_time[index + 1]))[0] if orbit.temporal_overlap: before = np.arange(indexes[0] - orbit.temporal_overlap, indexes[0], dtype=np.int64) after = np.arange(indexes[-1] + 1, indexes[-1] + orbit.temporal_overlap + 1, dtype=np.int64) after %= len(orbit.time) indexes = np.hstack([before, indexes, after]) if len(indexes) < 5: return None lon_nadir = orbit.lon[indexes] lat_nadir = orbit.lat[indexes] time = orbit.time[indexes] x_al = orbit.x_al[indexes] if orbit.temporal_overlap: # The orbit may not be ordered because of the overlap between the # passes. if indexes[0] < 0: time[:orbit.temporal_overlap] -= orbit.time[-1] + (orbit.time[1] - orbit.time[0]) elif indexes[-1] < indexes[0]: time[-orbit.temporal_overlap:] += orbit.time[-1] + ( orbit.time[-1] - orbit.time[-2]) equator_coordinates = equator_properties(lon_nadir, lat_nadir, time) # Selects the orbit in the defined box mask = parameters.box.within(lon_nadir, lat_nadir) if np.all(~mask): return None lon_nadir = lon_nadir[mask] lat_nadir = lat_nadir[mask] time = time[mask] x_al = x_al[mask] # Compute across track distances from nadir # Number of points in half of the swath half_swath = int((parameters.half_swath - parameters.half_gap) / parameters.delta_ac) + 1 x_ac = np.arange(half_swath) * parameters.delta_al + parameters.half_gap x_ac = np.hstack((-np.flip(x_ac), x_ac)) location = np.ascontiguousarray( np.vstack(math.spher2cart(lon_nadir, lat_nadir)).T) satellite_direction = math.satellite_direction(location) lon, lat = math.calculate_swath(parameters.delta_ac, parameters.half_gap, half_swath, VOLUMETRIC_MEAN_RADIUS, location, satellite_direction) return Pass(lat_nadir, lat, lon_nadir, lon, time, x_ac, x_al, equator_coordinates, offset=orbit.temporal_overlap, requirement_bounds=parameters.requirement_bounds)