# Copyright (c) 2021 CNES/JPL
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Orbit Propagator
----------------
"""
from typing import Dict, Iterator, NamedTuple, Optional, TextIO, Tuple
import logging
import warnings
import numpy as np
import pyinterp
from . import VOLUMETRIC_MEAN_RADIUS, math, settings
LOGGER = logging.getLogger(__name__)
[docs]def load_ephemeris(
stream: TextIO,
cols: Optional[Tuple[int, int,
int]] = None) -> Tuple[Dict[str, float], Tuple]:
"""Loads a tabular file describing a satellite orbit.
Args:
stream (typing.TextIO): Stream to the text file to be processed.
cols (tuple): A tuple of 3 elements describing the columns
representing the longitude, latitude and number of seconds elapsed
since the beginning of the orbit.
Returns:
tuple: A tuple of 2 elements containing the properties of the orbit
and the positions of the satellite.
"""
if cols is None:
cols = (1, 2, 0)
LOGGER.info("loading ephemeris: %s", settings.pretty_print(stream.name))
comments = []
data = []
for item in stream:
item = item.strip()
if item.startswith("#"):
comments.append(item[1:])
else:
data.append(list(float(value) for value in item.split()))
data = np.asarray(data, dtype=np.float64)
def to_dict(comments) -> Dict[str, float]:
"""Returns a dictionary describing the parameters of the orbit."""
result = dict()
for item in comments:
key, value = item.split("=")
result[key.strip()] = float(value)
return result
return to_dict(comments), tuple(data[:, item] for item in cols)
[docs]def rearrange_orbit(
cycle_duration: float, lon: np.ndarray, lat: np.ndarray,
time: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Rearrange orbit starting from pass 1.
Detect the beginning of pass 1 in the ephemeris. By definition, it is
the first passage at southernmost latitude.
Args:
cycle_duration (float): Cycle time in seconds.
lon (numpy.ndarray): Longitudes (in degrees)
lat (numpy.ndarray): Latitudes (in degrees)
time (np.ndarray): Date of the positions (in seconds).
Returns:
tuple: lon, lat, and time rearranged.
"""
dy = np.roll(lat, 1) - lat
indexes = np.where((dy < 0) & (np.roll(dy, 1) >= 0))[0] # type: ignore
# Shift coordinates, so that the first point of the orbit is the beginning
# of pass 1
shift = indexes[-1]
lon = np.hstack([lon[shift:], lon[:shift]])
lat = np.hstack([lat[shift:], lat[:shift]])
time = np.hstack([time[shift:], time[:shift]])
time = (time - time[0]) % cycle_duration
if np.any(time < 0):
warnings.warn('there are negative times in your orbit', RuntimeWarning)
return lon, lat, time
[docs]def calculate_pass_time(lat: np.ndarray, time: np.ndarray) -> np.ndarray:
"""Compute the initial time of each pass.
Args:
lat (numpy.ndarray): Latitudes (in degrees)
time (np.ndarray): Date of the latitudes (in seconds).
Returns:
numpy.ndarray: Start date of half-orbits.
"""
dy = np.roll(lat, 1) - lat
indexes = np.where(((dy < 0) & (np.roll(dy, 1) >= 0)) # type: ignore
| ((dy > 0) # type: ignore
& (np.roll(dy, 1) <= 0)))[0] # type: ignore
# The duration of the first pass is zero.
indexes[0] = 0
return time[indexes]
[docs]def select_box(
box: math.Box, lon: np.ndarray, lat: np.ndarray, time: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Selects the orbit in the defined box.
Args:
box (math.Box): Geographical selection
lon (numpy.ndarray): Longitudes (in degrees)
lat (numpy.ndarray): Latitudes (in degrees)
time (np.ndarray): Date of the positions (in seconds).
Returns:
tuple: the selected positions and associated dates.
"""
mask = box.within(lon, lat)
x_al = math.curvilinear_distance(lon, lat, VOLUMETRIC_MEAN_RADIUS)
return lon[mask], lat[mask], time[mask], x_al[mask]
[docs]class Orbit:
"""Properties of one orbit.
Args:
height (float): Satellite height (in m)
lat (numpy.ndarray): Latitudes (in degrees)
lon (numpy.ndarray): Longitudes (in degrees)
pass_time (np.ndarray): Start date of half-orbits.
time (np.ndarray): Date of the positions (in seconds).
x_al (np.ndarray): Along track distance
curvilinear_distance (float): Distance covered by the satellite during
a complete cycle.
shift_time (float): Time shift to be applied.
temporal_overlap (float, optional): Simulate products overlapping by
this amount of time, in seconds, with the previous and next
products.
"""
[docs] def __init__(self, height: float, lat: np.ndarray, lon: np.ndarray,
pass_time: np.ndarray, time: np.ndarray, x_al: np.ndarray,
curvilinear_distance: float,
shift_time: Optional[np.timedelta64],
temporal_overlap: Optional[float]):
self.height = height
self.lat = lat
self.lon = lon
self.pass_time = pass_time
self.shift_time = shift_time
self.time = time
self.x_al = x_al
self.curvilinear_distance = curvilinear_distance
self.temporal_overlap = Orbit._temporal_overlap(time, temporal_overlap)
[docs] @staticmethod
def _temporal_overlap(time: np.ndarray,
temporal_overlap: Optional[float]) -> int:
"""Converts the temporal overlap to an integer number of samples.
Args:
time (numpy.ndarray): Date of the positions (in seconds).
temporal_overlap (float, optional): Simulate products overlapping
by this amount of time, in seconds, with the previous and next
products.
Returns:
int: temporal overlap in number of samples
"""
if temporal_overlap is not None:
dt = np.mean(
np.diff(time)).astype("timedelta64[ms]").view("int64") * 1e-3
return round(temporal_overlap / dt)
return 0
[docs] def cycle_duration(self) -> np.timedelta64:
"""Get the cycle duration."""
return self.time[-1]
[docs] def orbit_duration(self) -> np.timedelta64:
"""Get the orbit duration."""
duration = self.cycle_duration().astype(
"timedelta64[us]") / np.timedelta64(
int(self.passes_per_cycle() // 2), 'us')
return np.timedelta64(int(duration), "us")
[docs] def passes_per_cycle(self) -> int:
"""Get the number of passes per cycle."""
return len(self.pass_time)
[docs] def pass_shift(self, number: int) -> np.timedelta64:
"""Get the time offset between the first measurement and the last
measurement of the track.
Args:
number (int): track number (must be in [1, passes_per_cycle()])
Returns:
numpy.datetime64: time offset
"""
passes_per_cycle = self.passes_per_cycle()
if number < 1 or number > passes_per_cycle:
raise ValueError(f"number must be in [1, {passes_per_cycle}]")
if number == passes_per_cycle:
indexes = np.where(self.time >= self.pass_time[-1])[0]
else:
indexes = np.where((self.time >= self.pass_time[number - 1])
& (self.time < self.pass_time[number]))[0]
return np.sum(np.diff(self.time[indexes]))
[docs] def pass_duration(self, number: int) -> np.timedelta64:
"""Get the duration of a given pass.
Args:
number (int): track number (must be in [1, passes_per_cycle()])
Returns:
numpy.datetime64: track duration
"""
passes_per_cycle = self.passes_per_cycle()
if number < 1 or number > passes_per_cycle:
raise ValueError(f"number must be in [1, {passes_per_cycle}]")
if number == passes_per_cycle:
return self.time[-1] - self.pass_time[-1] + self.time[
1] - self.time[0]
return self.pass_time[number] - self.pass_time[number - 1]
[docs] def decode_absolute_pass_number(self, number: int) -> Tuple[int, int]:
"""Calculate the cycle and pass number from a given absolute pass
number.
Args:
number (int): absolute pass number
Returns:
tuple: cycle and pass number
"""
number -= 1
return (int(number / self.passes_per_cycle()) + 1,
(number % self.passes_per_cycle()) + 1)
[docs] def encode_absolute_pass_number(self, cycle_number: int,
pass_number: int) -> int:
"""Calculate the absolute pass number for a given half-orbit.
Args:
cycle_number (int): Cycle number
pass_number (int): Pass number
Returns:
int: Absolute pass number
"""
passes_per_cycle = self.passes_per_cycle()
if not 1 <= pass_number <= passes_per_cycle:
raise ValueError(f"pass_number must be in [1, {passes_per_cycle}")
return (cycle_number - 1) * self.passes_per_cycle() + pass_number
[docs] def delta_t(self) -> np.timedelta64:
"""Returns the average time difference between two measurements.
Returns:
int: average time difference
"""
return np.diff(self.time).mean()
[docs] def iterate(
self,
first_date: Optional[np.datetime64] = None,
last_date: Optional[np.datetime64] = None,
absolute_pass_number: int = 1
) -> Iterator[Tuple[int, int, np.datetime64]]:
"""Obtain all half-orbits within the defined time interval.
Args:
first_date (numpy.datetime64): First date of the period to be
considered.
last_date (numpy.datetime64): Last date of the period to be
considered.
absolute_pass_number (int, optional): Absolute number of the first
pass to be returned.
Returns:
iterator: An iterator for all passes in the interval pointing to
the cycle number, pass number and start date of the half-orbit.
"""
date = first_date or np.datetime64("now")
last_date = last_date or date + self.cycle_duration()
while date <= last_date:
cycle_number, pass_number = self.decode_absolute_pass_number(
absolute_pass_number)
yield cycle_number, pass_number, date
# Shift the date of the duration of the generated pass
date += self.pass_duration(pass_number)
# Update of the number of the next pass to be generated
absolute_pass_number += 1
return StopIteration
[docs]class EquatorCoordinates(NamedTuple):
"""Coordinates of the satellite at the equator."""
#: Longitude
longitude: float
#: Product dataset name
time: np.datetime64
[docs]class Pass:
"""Handle one pass.
Args:
lat_nadir (np.ndarray): Latitudes at nadir
lat (np.ndarray): Latitudes of the swath
lon_nadir (np.ndarray): Longitudes at nadir
lon (np.ndarray): Longitudes of the swath
time (np.ndarray): Time of measurements
x_ac (np.ndarray): Across track distance.
x_al (np.ndarray): Along track distance.
equator_coordinate (EquatorCoordinates): Coordinates of the satellite at
the equator.
offset (int): number of measurements to be skipped at the beginning and
at the end of the pass to ignore the temporal overlap between
consecutive passes.
requirement_bounds: Limits of swath requirements. Measurements outside
the span will be set to NaN.
"""
[docs] def __init__(self,
lat_nadir: np.ndarray,
lat: np.ndarray,
lon_nadir: np.ndarray,
lon: np.ndarray,
time: np.ndarray,
x_ac: np.ndarray,
x_al: np.ndarray,
equator_coordinates: EquatorCoordinates,
offset: int = 0,
requirement_bounds: Optional[Tuple[float, float]] = None):
self._date = None
self.lat = lat
self.lat_nadir = lat_nadir
self.lon = lon
self.lon_nadir = lon_nadir
self.offset = offset
self.requirement_bounds = requirement_bounds
self.timedelta = time
self.x_ac = x_ac
self.x_al = x_al
self._equator_coordinates = equator_coordinates
@property
def time(self):
"""Time of positions."""
assert self._date is not None
return self._date + (self.timedelta - self.timedelta[self.offset])
@property
def time_eq(self):
"""Time at the equator."""
assert self._date is not None
return self._date + (self._equator_coordinates.time -
self.timedelta[self.offset])
@property
def lon_eq(self):
"""Equator longitude."""
return self._equator_coordinates.longitude
[docs] def set_simulated_date(self, date: np.datetime64) -> None:
"""Set time of positions."""
assert self._date is None
self._date = date
[docs] def mask(self) -> np.ndarray:
"""Obtain a mask to set NaN values outside the mission requirements."""
if self.requirement_bounds is not None:
valid = np.full_like(self.x_ac, np.nan)
valid[(np.abs(self.x_ac) >= self.requirement_bounds[0])
& (np.abs(self.x_ac) <= self.requirement_bounds[1])] = 1
along_track = np.full(self.lon_nadir.shape, 1, dtype=np.float64)
return along_track[:, np.newaxis] * valid
return np.full(self.lon.shape, 1, dtype=np.float64)
[docs]def calculate_orbit(parameters: settings.Parameters,
ephemeris: TextIO) -> Orbit:
"""Computes the orbit nadir on a subdomain.
The path of the satellite is given by the orbit file and the subdomain
corresponds to the one in the model. Note that a subdomain can be manually
added in the parameters file.
Args:
parameters (settings.Parameters): Simulation parameters.
ephemeris (typing.TextIO): Stream to the ephemeris CSV file to be
processed.
Returns:
Orbit: The orbit's loaded into memory.
"""
properties, (lon, lat,
time) = load_ephemeris(ephemeris,
cols=parameters.ephemeris_cols)
if np.any(lon > 360) or np.any(np.abs(lat) > 90):
raise RuntimeError("The definition of ephemeris is incorrect. Is the "
"'ephemeris_cols' parameter well defined?")
# Put longitude in [-180, 180[
lon = math.normalize_longitude(lon)
for key, value in properties.items():
if key == 'cycle_duration':
parameters.cycle_duration = value
elif key == 'height':
parameters.height = value
else:
raise RuntimeError(f"The {key!r} parameter defined in the "
"ephemeris is not handled.")
if parameters.cycle_duration is None:
raise RuntimeError("The cycle duration is not defined either in the "
"ephemeris file or in the simulator parameters.")
if parameters.height is None:
raise RuntimeError(
"The satellite altitude is not defined either in "
"the ephemeris file or in the simulator parameters.")
wgs = pyinterp.geodetic.Coordinates()
# If orbit is at low resolution, interpolate the orbit provided
if np.mean(np.diff(time)) > 0.5:
time_hr = np.arange(time[0], time[-1], 0.5, dtype=time.dtype)
lon, lat = pyinterp.orbit.interpolate(lon,
lat,
time,
time_hr,
wgs=wgs,
half_window_size=50)
time = time_hr
# Cut orbit if more than an orbit cycle is provided
indexes = np.where(time < parameters.cycle_duration * 86400)[0]
lon = lon[indexes]
lat = lat[indexes]
time = time[indexes]
# Get cycle period.
cycle_duration = time[-1] + time[1] - time[0]
# shift time if the user needs to shift the time of the orbit
if parameters.shift_time is not None:
indexes = np.where(time >= parameters.shift_time)[0]
lon = np.hstack([lon[indexes[0]:], lon[:indexes[0]]])
lat = np.hstack([lat[indexes[0]:], lat[:indexes[0]]])
del indexes
if parameters.shift_lon is not None:
lon = math.normalize_longitude(lon + parameters.shift_lon)
# Rearrange orbit starting from pass 1
lon, lat, time = rearrange_orbit(cycle_duration, lon, lat, time)
# Calculates the along track distance
distance = math.curvilinear_distance(lon, lat, VOLUMETRIC_MEAN_RADIUS)
# Interpolate the final orbit according the given along track resolution
x_al = np.arange(distance[0],
distance[-2],
parameters.delta_al,
dtype=distance.dtype)
# In order to avoid any problem of calculation of dates on real numbers,
# we manipulate dates encoded on integers from this point.
time = (np.interp(x_al, distance[:-1], time[:-1]) *
1e6).astype("timedelta64[us]")
lon, lat = pyinterp.orbit.interpolate(lon,
lat,
distance,
x_al,
wgs=wgs,
half_window_size=10)
return Orbit(parameters.height, lat, lon,
np.sort(calculate_pass_time(lat,
time)), time, x_al, distance[-1],
parameters.shift_time, parameters.temporal_overlap)
[docs]def equator_properties(lon_nadir: np.ndarray, lat_nadir: np.ndarray,
time: np.ndarray) -> EquatorCoordinates:
"""Calculate the position of the satellite at the equator."""
# Search the nearest point to the equator
i1 = (np.abs(lat_nadir)).argmin()
i0 = i1 - 1
if lat_nadir[i0] * lat_nadir[i1] > 0:
i0, i1 = i1, i1 + 1
lon1 = lon_nadir[i0:i1 + 1]
lat1 = lat_nadir[i0:i1 + 1]
time = time[i0:i1 + 1]
# Calculate the intersection point
p11, p12 = np.stack(math.spher2cart(lon1, lat1)).T
p21, p22 = np.stack(
math.spher2cart(lon1, np.zeros((2, ), dtype=np.float64))).T
# Calculate normal of the normal vectors for arc 1 and 2
l = np.cross(np.cross(p11, p12), np.cross(p21, p22))
# Calculate one of the solution of the intersection point (the other is -i1)
i1 = l / np.linalg.norm(l)
lon_eq, _ = math.cart2spher(i1[:1], i1[1:2], i1[2:])
if not lon1.min() <= lon_eq <= lon1.max():
i1 = -i1
lon_eq, _ = math.cart2spher(i1[:1], i1[1:2], i1[2:])
# Calculate the distance along the track
lon1 = np.insert(lon1, 1, lon_eq)
lat1 = np.insert(lat1, 1, 0)
x_al = math.curvilinear_distance(lon1, lat1, VOLUMETRIC_MEAN_RADIUS)
# Pop the along track distance at the equator
x_eq = x_al[1]
x_al = np.delete(x_al, 1)
# Finally interpolate the time of the equator
time_eq = np.interp(x_eq, x_al, time.astype("int64")).astype(time.dtype)
return EquatorCoordinates(lon_eq[0], time_eq)
[docs]def calculate_pass(pass_number: int, orbit: Orbit,
parameters: settings.Parameters) -> Optional[Pass]:
"""Get the properties of an half-orbit.
Args:
pass_number (int): Pass number
orbit (Orbit): Orbit describing the pass to be calculated.
parameters (settings.Parameters): Simulation parameters.
Returns:
Pass, optional: The properties of the pass or None, if the pas is
missing in the selected area.
"""
index = pass_number - 1
# Selected indexes corresponding to the current pass
if index == len(orbit.pass_time) - 1:
indexes = np.where(orbit.time >= orbit.pass_time[-1])[0]
else:
indexes = np.where((orbit.time >= orbit.pass_time[index])
& (orbit.time < orbit.pass_time[index + 1]))[0]
if orbit.temporal_overlap:
before = np.arange(indexes[0] - orbit.temporal_overlap,
indexes[0],
dtype=np.int64)
after = np.arange(indexes[-1] + 1,
indexes[-1] + orbit.temporal_overlap + 1,
dtype=np.int64)
after %= len(orbit.time)
indexes = np.hstack([before, indexes, after])
if len(indexes) < 5:
return None
lon_nadir = orbit.lon[indexes]
lat_nadir = orbit.lat[indexes]
time = orbit.time[indexes]
x_al = orbit.x_al[indexes]
if orbit.temporal_overlap:
# The orbit may not be ordered because of the overlap between the
# passes.
if indexes[0] < 0:
time[:orbit.temporal_overlap] -= orbit.time[-1] + (orbit.time[1] -
orbit.time[0])
elif indexes[-1] < indexes[0]:
time[-orbit.temporal_overlap:] += orbit.time[-1] + (
orbit.time[-1] - orbit.time[-2])
equator_coordinates = equator_properties(lon_nadir, lat_nadir, time)
# Selects the orbit in the defined box
mask = parameters.box.within(lon_nadir, lat_nadir)
if np.all(~mask):
return None
lon_nadir = lon_nadir[mask]
lat_nadir = lat_nadir[mask]
time = time[mask]
x_al = x_al[mask]
# Compute across track distances from nadir
# Number of points in half of the swath
half_swath = int((parameters.half_swath - parameters.half_gap) /
parameters.delta_ac) + 1
x_ac = np.arange(half_swath) * parameters.delta_al + parameters.half_gap
x_ac = np.hstack((-np.flip(x_ac), x_ac))
location = np.ascontiguousarray(
np.vstack(math.spher2cart(lon_nadir, lat_nadir)).T)
satellite_direction = math.satellite_direction(location)
lon, lat = math.calculate_swath(parameters.delta_ac, parameters.half_gap,
half_swath, VOLUMETRIC_MEAN_RADIUS,
location, satellite_direction)
return Pass(lat_nadir,
lat,
lon_nadir,
lon,
time,
x_ac,
x_al,
equator_coordinates,
offset=orbit.temporal_overlap,
requirement_bounds=parameters.requirement_bounds)