Source code for timescale.time

#!/usr/bin/env python
"""
time.py
Written by Tyler Sutterley (05/2026)
Utilities for calculating time operations

PYTHON DEPENDENCIES:
    numpy: Scientific Computing Tools For Python
        https://numpy.org
    dateutil: powerful extensions to datetime
        https://dateutil.readthedocs.io/en/stable/
    lxml: processing XML and HTML in Python
        https://pypi.python.org/pypi/lxml

PROGRAM DEPENDENCIES:
    utilities.py: download and management utilities for syncing files

UPDATE HISTORY:
    Updated 05/2026: added functions to update delta time files from project
        updated CDDIS ftp login options for encrypted connections
    Updated 04/2026: added endpoint option (defaults to True) to date_range
        added default day and month values to from_calendar function
    Updated 12/2025: allow conversion to datetime be in different units
    Updated 08/2025: add nominal years (365.25 days long) to Timescale class
    Updated 07/2025: verify Bulletin-A entries are not already in merged file
        add Besselian year conversion to Timescale class
    Updated 03/2025: added attributes for ut1_utc and gps_utc
    Updated 02/2025: added GLONASS as delta time option
        update GPS seconds calculation for output from timescale object
    Updated 10/2024: split is_leap from calendar_days function
    Updated 09/2024: make Timescale and Calendar objects subscriptable
    Updated 06/2024: assert that year, month, day, etc are float64
        added conversions between common epochs and MJD
    Updated 05/2024: add Calendar class to mimick datetime functions
    Updated 04/2024: added quarter year approximate conversions
        added _from_sec dictionary for named time units
        replaced deprecated datetime.datetime.utcnow
        updated urls and ftp links for updating the leap seconds list
    Updated 02/2024: move the immutable parameters in timescale class
    Updated 10/2023: add function to convert from calendar dates
        add min, max and mean functions to Timescale class
    Forked 08/2023: forked from pyTMD time utility functions
    Updated 06/2023: improve conversion of timescale to datetime arrays
    Updated 05/2023: add timescale class for converting between time scales
        added timescale to_datetime function to create datetime arrays
        allow epoch arguments to be numpy datetime64 variables or strings
        function to convert a string with time zone information to datetime
    Updated 04/2023: using pathlib to define and expand paths
    Updated 03/2023: add basic variable typing to function inputs
    Updated 12/2022: added interpolation for delta time (TT - UT1)
        output variables for some standard epochs used within tide programs
    Updated 11/2022: use IERS https server as default for Bulletin-A files
        added download function for latest Bulletin-A file from IERS
        added function to append from existing merged delta time file
        use f-strings for formatting verbose or ascii output
    Updated 10/2022: added encoding for reading leap seconds ascii files
    Updated 08/2022: output variables to unit conversion to seconds
        and the number of days per month for both leap and standard years
    Updated 05/2022: changed keyword arguments to camel case
    Updated 04/2022: updated docstrings to numpy documentation format
    Updated 04/2021: updated NIST ftp server url for leap-seconds.list
    Updated 03/2021: replaced numpy bool/int to prevent deprecation warnings
    Updated 02/2021: NASA CDDIS anonymous ftp access discontinued
    Updated 01/2021: added ftp connection checks
        add date parser for cases when only a calendar date with no units
    Updated 12/2020: merged with convert_julian and convert_calendar_decimal
        added calendar_days routine to get number of days per month
    Updated 09/2020: added wrapper function for merging Bulletin-A files
        can parse date strings in form "time-units since yyyy-mm-dd hh:mm:ss"
    Updated 08/2020: added NASA Earthdata routines for downloading from CDDIS
    Written 07/2020
"""

from __future__ import annotations

import re
import copy
import logging
import pathlib
import warnings
import datetime
import traceback
import numpy as np
import dateutil.parser
import scipy.interpolate
import timescale.utilities

# conversion factors between time units and seconds
_to_sec = {
    "nanoseconds": 1e-9,
    "nanosecond": 1e-9,
    "nanosec": 1e-9,
    "nanosecs": 1e-9,
    "nsec": 1e-9,
    "nsecs": 1e-9,
    "ns": 1e-9,
    "microseconds": 1e-6,
    "microsecond": 1e-6,
    "microsec": 1e-6,
    "microsecs": 1e-6,
    "us": 1e-6,
    "milliseconds": 1e-3,
    "millisecond": 1e-3,
    "millisec": 1e-3,
    "millisecs": 1e-3,
    "msec": 1e-3,
    "msecs": 1e-3,
    "ms": 1e-3,
    "seconds": 1.0,
    "second": 1.0,
    "sec": 1.0,
    "secs": 1.0,
    "s": 1.0,
    "minutes": 60.0,
    "minute": 60.0,
    "min": 60.0,
    "mins": 60.0,
    "hours": 3600.0,
    "hour": 3600.0,
    "hr": 3600.0,
    "hrs": 3600.0,
    "h": 3600.0,
    "day": 86400.0,
    "days": 86400.0,
    "d": 86400.0,
    "D": 86400.0,
}
# approximate conversions for longer periods
_to_sec["mon"] = 30.0 * 86400.0
_to_sec["month"] = 30.0 * 86400.0
_to_sec["months"] = 30.0 * 86400.0
_to_sec["common_year"] = 365.0 * 86400.0
_to_sec["common_years"] = 365.0 * 86400.0
_to_sec["year"] = 365.25 * 86400.0
_to_sec["years"] = 365.25 * 86400.0
_to_sec["quarter"] = 365.25 * 86400.0 / 4.0
_to_sec["quarters"] = 365.25 * 86400.0 / 4.0
# conversion factors from seconds to named time units
_from_sec = {k: 1.0 / v for k, v in _to_sec.items()}

# standard (common) epochs
_mjd_epoch = (1858, 11, 17, 0, 0, 0)
_serial_epoch = (0, 1, 1, 0, 0, 0)
_ntp_epoch = (1900, 1, 1, 0, 0, 0)
_cnes_epoch = (1950, 1, 1, 0, 0, 0)
_unix_epoch = (1970, 1, 1, 0, 0, 0)
_gps_epoch = (1980, 1, 6, 0, 0, 0)
_tide_epoch = (1992, 1, 1, 0, 0, 0)
_j2000_epoch = (2000, 1, 1, 12, 0, 0)
_atlas_sdp_epoch = (2018, 1, 1, 0, 0, 0)
# number of days between the Julian day epoch and standard epochs
_jd_mjd = 2400000.5
_jd_serial = 1721058.5
_jd_gps = 2444244.5
# number of days between MJD and the standard (common) epochs
_mjd_ntp = 15020
_mjd_cnes = 33282
_mjd_unix = 40587
_mjd_gps = _jd_gps - _jd_mjd
_mjd_tide = 48622
_mjd_j2000 = 51544.5
_mjd_atlas_sdp = 58119
_mjd_serial = _jd_serial - _jd_mjd


# PURPOSE: parse a date string and convert to a datetime object in UTC
[docs] def parse(date_string: str): """ Parse a date string and convert to a naive ``datetime`` object in UTC Parameters ---------- date_string: str formatted time string Returns ------- date: obj output ``datetime`` object """ # parse the date string date = dateutil.parser.parse(date_string) # convert to UTC if containing time-zone information # then drop the timezone information to prevent unsupported errors if date.tzinfo: date = date.astimezone(dateutil.tz.UTC).replace(tzinfo=None) # return the datetime object return date
# PURPOSE: parse a date string into epoch and units scale
[docs] def parse_date_string(date_string: str): """ Parse a date string of the form - time-units since ``yyyy-mm-dd hh:mm:ss`` - ``yyyy-mm-dd hh:mm:ss`` for exact calendar dates Parameters ---------- date_string: str time-units since yyyy-mm-dd hh:mm:ss Returns ------- epoch: list epoch of ``delta_time`` conversion_factor: float multiplication factor to convert to seconds """ # try parsing the original date string as a date try: epoch = parse(date_string) except ValueError: pass else: # return the epoch (as list) return (datetime_to_list(epoch), 0.0) # split the date string into units and epoch units, epoch = split_date_string(date_string) if units not in _to_sec.keys(): raise ValueError(f"Invalid units: {units}") # return the epoch (as list) and the time unit conversion factors return (datetime_to_list(epoch), _to_sec[units])
# PURPOSE: split a date string into units and epoch
[docs] def split_date_string(date_string: str): """ Split a date string into units and epoch Parameters ---------- date_string: str time-units since yyyy-mm-dd hh:mm:ss """ try: units, _, epoch = date_string.split(None, 2) except ValueError: raise ValueError(f"Invalid format: {date_string}") else: return (units.lower(), parse(epoch))
# PURPOSE: convert a datetime object into a list
[docs] def datetime_to_list(date): """ Convert a ``datetime`` object into a list Parameters ---------- date: obj Input ``datetime`` object to convert Returns ------- date: list [year,month,day,hour,minute,second] """ return [ date.year, date.month, date.day, date.hour, date.minute, date.second, ]
# PURPOSE: create a range of dates
[docs] def date_range( start: str | np.datetime64 | datetime.datetime, end: str | np.datetime64 | datetime.datetime, step: int | float = 1, units: str = "D", endpoint: bool = True, ): """ Create a range of dates Parameters ---------- start: str, np.datetime64 or datetime.datetime start date end: str, np.datetime64 or datetime.datetime end date step: int or float, default 1 step size units: str, default 'D' datetime units - ``'Y'``: year - ``'M'``: month - ``'W'``: week - ``'D'``: day - ``'h'``: hour - ``'m'``: minute - ``'s'``: second - ``'ms'``: millisecond endpoint: bool, default True If True, include the end date in the output """ # convert start and end dates to datetime64 if isinstance(start, str): start = np.array(parse(start), dtype=f"datetime64[{units}]") if isinstance(end, str): end = np.array(parse(end), dtype=f"datetime64[{units}]") # create date range stop = end + step if endpoint else end return np.arange(start, stop, step)
# days per month in a leap and a standard year # only difference is February (29 vs. 28) _dpm_leap = [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] _dpm_stnd = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] def is_leap(year: int | float) -> bool: """ Determines if a year is a leap year Parameters ---------- year: int or float calendar year """ # Rules in the Gregorian calendar for a year to be a leap year: # divisible by 4, but not by 100 unless divisible by 400 # True length of the year is about 365.2422 days # Adding a leap day every four years ==> average 365.25 # Subtracting a leap year every 100 years ==> average 365.24 # Adding a leap year back every 400 years ==> average 365.2425 # Subtracting a leap year every 4000 years ==> average 365.24225 m4 = year % 4 m100 = year % 100 m400 = year % 400 m4000 = year % 4000 # determine if the year is a leap year using criteria return (m4 == 0) & (m100 != 0) | (m400 == 0) & (m4000 != 0) # PURPOSE: gets the number of days per month for a given year
[docs] def calendar_days(year: int | float) -> np.ndarray: """ Calculates the number of days per month for a given year Parameters ---------- year: int or float calendar year Returns ------- dpm: np.ndarray number of days for each month """ # determine if the year is a leap year # and return the number of days per month if is_leap(year): return np.array(_dpm_leap, dtype=np.float64) else: return np.array(_dpm_stnd, dtype=np.float64)
# PURPOSE: convert a numpy datetime array to delta times since an epoch
[docs] def convert_datetime( date: float | np.ndarray, epoch: str | tuple | list | np.datetime64 = _unix_epoch, ): """ Convert a ``numpy`` ``datetime`` array to seconds since ``epoch`` Parameters ---------- date: np.ndarray ``numpy`` ``datetime`` array epoch: str, tuple, list, np.ndarray, default (1970,1,1,0,0,0) epoch for output ``delta_time`` Returns ------- delta_time: float seconds since epoch """ # convert epoch to datetime variables if isinstance(epoch, (tuple, list)): epoch = np.datetime64(datetime.datetime(*epoch)) elif isinstance(epoch, str): epoch = np.datetime64(parse(epoch)) # convert to delta time return (date - epoch) / np.timedelta64(1, "s")
# PURPOSE: convert times from seconds since epoch1 to time since epoch2
[docs] def convert_delta_time( delta_time: np.ndarray, epoch1: str | tuple | list | np.datetime64 | None = None, epoch2: str | tuple | list | np.datetime64 | None = None, scale: float = 1.0, ): """ Convert delta time from seconds since ``epoch1`` to time since ``epoch2`` Parameters ---------- delta_time: np.ndarray seconds since epoch1 epoch1: str, tuple, list or NoneType, default None epoch for input ``delta_time`` epoch2: str, tuple, list or NoneType, default None epoch for output ``delta_time`` scale: float, default 1.0 scaling factor for converting time to output units """ # convert epochs to datetime variables if isinstance(epoch1, (tuple, list)): epoch1 = np.datetime64(datetime.datetime(*epoch1)) elif isinstance(epoch1, str): epoch1 = np.datetime64(parse(epoch1)) if isinstance(epoch2, (tuple, list)): epoch2 = np.datetime64(datetime.datetime(*epoch2)) elif isinstance(epoch2, str): epoch2 = np.datetime64(parse(epoch2)) # calculate the total difference in time in seconds delta_time_epochs = (epoch2 - epoch1) / np.timedelta64(1, "s") # subtract difference in time and rescale to output units return scale * (delta_time - delta_time_epochs)
# PURPOSE: calculate the delta time from calendar date # http://scienceworld.wolfram.com/astronomy/JulianDate.html
[docs] def convert_calendar_dates( year: np.ndarray, month: np.ndarray, day: np.ndarray, hour: np.ndarray | float = 0.0, minute: np.ndarray | float = 0.0, second: np.ndarray | float = 0.0, epoch: tuple | list | np.datetime64 = _tide_epoch, scale: float = 1.0, ) -> np.ndarray: """ Calculate the time in units since ``epoch`` from calendar dates Parameters ---------- year: np.ndarray calendar year month: np.ndarray month of the year day: np.ndarray day of the month hour: np.ndarray or float, default 0.0 hour of the day minute: np.ndarray or float, default 0.0 minute of the hour second: np.ndarray or float, default 0.0 second of the minute epoch: tuple or list, default timescale.time._tide_epoch epoch for output ``delta_time`` scale: float, default 1.0 scaling factor for converting time to output units Returns ------- delta_time: np.ndarray time since epoch """ # verify input data types year = np.array(year, dtype=np.float64) month = np.array(month, dtype=np.float64) day = np.array(day, dtype=np.float64) hour = np.array(hour, dtype=np.float64) minute = np.array(minute, dtype=np.float64) second = np.array(second, dtype=np.float64) # calculate date in Modified Julian Days (MJD) from calendar date # MJD: days since November 17, 1858 (1858-11-17T00:00:00) MJD = ( 367.0 * year - np.floor(7.0 * (year + np.floor((month + 9.0) / 12.0)) / 4.0) - np.floor( 3.0 * (np.floor((year + (month - 9.0) / 7.0) / 100.0) + 1.0) / 4.0 ) + np.floor(275.0 * month / 9.0) + day + hour / 24.0 + minute / 1440.0 + second / 86400.0 + 1721028.5 - _jd_mjd ) # convert epochs to datetime variables epoch1 = np.datetime64(datetime.datetime(*_mjd_epoch)) if isinstance(epoch, (tuple, list)): epoch = np.datetime64(datetime.datetime(*epoch)) elif isinstance(epoch, str): epoch = np.datetime64(parse(epoch)) # calculate the total difference in time in days delta_time_epochs = (epoch - epoch1) / np.timedelta64(1, "D") # return the date in units (default days) since epoch return scale * np.array(MJD - delta_time_epochs, dtype=np.float64)
# PURPOSE: Converts from calendar dates into decimal years
[docs] def convert_calendar_decimal( year: np.ndarray, month: np.ndarray, day: np.ndarray, hour: np.ndarray | float | None = None, minute: np.ndarray | float | None = None, second: np.ndarray | float | None = None, DofY: np.ndarray | float | None = None, ) -> np.ndarray: """ Converts from calendar date into decimal years taking into account leap years :cite:p:`Dershowitz:2007cc` Parameters ---------- year: np.ndarray calendar year month: np.ndarray calendar month day: np.ndarray, float or NoneType, default None day of the month hour: np.ndarray, float or NoneType, default None hour of the day minute: np.ndarray, float or NoneType, default None minute of the hour second: np.ndarray, float or NoneType, default None second of the minute DofY: np.ndarray, float or NoneType, default None day of the year Returns ------- t_date: np.ndarray date in decimal-year format """ # number of dates n_dates = len(np.atleast_1d(year)) # create arrays for calendar date variables cal_date = {} cal_date["year"] = np.zeros((n_dates)) cal_date["month"] = np.zeros((n_dates)) cal_date["day"] = np.zeros((n_dates)) cal_date["hour"] = np.zeros((n_dates)) cal_date["minute"] = np.zeros((n_dates)) cal_date["second"] = np.zeros((n_dates)) # day of the year cal_date["DofY"] = np.zeros((n_dates)) # remove singleton dimensions and use year and month cal_date["year"][:] = np.squeeze(year) cal_date["month"][:] = np.squeeze(month) # create output date variable t_date = np.zeros((n_dates)) # days per month in a leap and a standard year # only difference is February (29 vs. 28) dpm_leap = np.array(_dpm_leap, dtype=np.float64) dpm_stnd = np.array(_dpm_stnd, dtype=np.float64) # Rules in the Gregorian calendar for a year to be a leap year: # divisible by 4, but not by 100 unless divisible by 400 # True length of the year is about 365.2422 days # Adding a leap day every four years ==> average 365.25 # Subtracting a leap year every 100 years ==> average 365.24 # Adding a leap year back every 400 years ==> average 365.2425 # Subtracting a leap year every 4000 years ==> average 365.24225 m4 = cal_date["year"] % 4 m100 = cal_date["year"] % 100 m400 = cal_date["year"] % 400 m4000 = cal_date["year"] % 4000 # find indices for standard years and leap years using criteria (leap,) = np.nonzero((m4 == 0) & (m100 != 0) | (m400 == 0) & (m4000 != 0)) (stnd,) = np.nonzero((m4 != 0) | (m100 == 0) & (m400 != 0) | (m4000 == 0)) # calculate the day of the year if DofY is not None: # if entered directly as an input # remove 1 so day 1 (Jan 1st) = 0.0 in decimal format cal_date["DofY"][:] = np.squeeze(DofY) - 1 else: # use calendar month and day of the month to calculate day of the year # month minus 1: January = 0, February = 1, etc (indice of month) # in decimal form: January = 0.0 month_m1 = np.array(cal_date["month"], dtype=int) - 1 # day of month if day is not None: # remove 1 so 1st day of month = 0.0 in decimal format cal_date["day"][:] = np.squeeze(day) - 1.0 else: # if not entering days as an input # will use the mid-month value cal_date["day"][leap] = dpm_leap[month_m1[leap]] / 2.0 cal_date["day"][stnd] = dpm_stnd[month_m1[stnd]] / 2.0 # create matrix with the lower half = 1 # this matrix will be used in a matrix multiplication # to calculate the total number of days for prior months # the -1 will make the diagonal == 0 # i.e. first row == all zeros and the # last row == ones for all but the last element mon_mat = np.tri(12, 12, -1) # using a dot product to calculate total number of days # for the months before the input date # basically is sum(i*dpm) # where i is 1 for all months < the month of interest # and i is 0 for all months >= the month of interest # month of interest is zero as the exact days will be # used to calculate the date # calculate the day of the year for leap and standard # use total days of all months before date # and add number of days before date in month cal_date["DofY"][stnd] = cal_date["day"][stnd] + np.dot( mon_mat[month_m1[stnd], :], dpm_stnd ) cal_date["DofY"][leap] = cal_date["day"][leap] + np.dot( mon_mat[month_m1[leap], :], dpm_leap ) # hour of day (else is zero) if hour is not None: cal_date["hour"][:] = np.squeeze(hour) # minute of hour (else is zero) if minute is not None: cal_date["minute"][:] = np.squeeze(minute) # second in minute (else is zero) if second is not None: cal_date["second"][:] = np.squeeze(second) # calculate decimal date # convert hours, minutes and seconds into days # convert calculated fractional days into decimal fractions of the year # Leap years t_date[leap] = cal_date["year"][leap] + ( cal_date["DofY"][leap] + cal_date["hour"][leap] / 24.0 + cal_date["minute"][leap] / 1440.0 + cal_date["second"][leap] / 86400.0 ) / np.sum(dpm_leap) # Standard years t_date[stnd] = cal_date["year"][stnd] + ( cal_date["DofY"][stnd] + cal_date["hour"][stnd] / 24.0 + cal_date["minute"][stnd] / 1440.0 + cal_date["second"][stnd] / 86400.0 ) / np.sum(dpm_stnd) return t_date
# PURPOSE: Converts from Julian day to calendar date and time
[docs] def convert_julian(JD: np.ndarray, **kwargs): """ Converts from Julian day to calendar date and time :cite:p:`Press:1988we` :cite:p:`Hatcher:1984uo` Parameters ---------- JD: np.ndarray Julian Day (days since 01-01-4713 BCE at 12:00:00) astype: str or NoneType, default None convert output to variable type format: str, default 'dict' format of output variables - ``'dict'``: dictionary with variable keys - ``'tuple'``: tuple in most-to-least-significant order - ``'zip'``: aggregated variable sets Returns ------- year: np.ndarray calendar year month: np.ndarray calendar month day: np.ndarray day of the month hour: np.ndarray hour of the day minute: np.ndarray minute of the hour second: np.ndarray second of the minute """ # set default keyword arguments kwargs.setdefault("astype", None) kwargs.setdefault("format", "dict") # raise warnings for deprecated keyword arguments deprecated_keywords = dict(ASTYPE="astype", FORMAT="format") for old, new in deprecated_keywords.items(): if old in kwargs.keys(): warnings.warn( f"""Deprecated keyword argument {old}. Changed to '{new}'""", DeprecationWarning, ) # set renamed argument to not break workflows kwargs[new] = copy.copy(kwargs[old]) # convert to array if only a single value was imported if np.ndim(JD) == 0: JD = np.atleast_1d(JD) single_value = True else: single_value = False # verify Julian day JDO = np.floor(JD + 0.5) C = np.zeros_like(JD) # calculate C for dates before and after the switch to Gregorian IGREG = 2299161.0 (ind1,) = np.nonzero(JDO < IGREG) C[ind1] = JDO[ind1] + 1524.0 (ind2,) = np.nonzero(JDO >= IGREG) B = np.floor((JDO[ind2] - 1867216.25) / 36524.25) C[ind2] = JDO[ind2] + B - np.floor(B / 4.0) + 1525.0 # calculate coefficients for date conversion D = np.floor((C - 122.1) / 365.25) E = np.floor((365.0 * D) + np.floor(D / 4.0)) F = np.floor((C - E) / 30.6001) # calculate day, month, year and hour day = np.floor(C - E + 0.5) - np.floor(30.6001 * F) month = F - 1.0 - 12.0 * np.floor(F / 14.0) year = D - 4715.0 - np.floor((7.0 + month) / 10.0) hour = np.floor(24.0 * (JD + 0.5 - JDO)) # calculate minute and second G = (JD + 0.5 - JDO) - hour / 24.0 minute = np.floor(G * 1440.0) second = (G - minute / 1440.0) * 86400.0 # convert all variables to output type (from float) if kwargs["astype"] is not None: year = year.astype(kwargs["astype"]) month = month.astype(kwargs["astype"]) day = day.astype(kwargs["astype"]) hour = hour.astype(kwargs["astype"]) minute = minute.astype(kwargs["astype"]) second = second.astype(kwargs["astype"]) # if only a single value was imported initially: remove singleton dims if single_value: year = year.item(0) month = month.item(0) day = day.item(0) hour = hour.item(0) minute = minute.item(0) second = second.item(0) # return date variables in output format if kwargs["format"] == "dict": return dict( year=year, month=month, day=day, hour=hour, minute=minute, second=second, ) elif kwargs["format"] == "tuple": return (year, month, day, hour, minute, second) elif kwargs["format"] == "zip": return zip(year, month, day, hour, minute, second)
# delta time (TT - UT1) file _delta_file = timescale.utilities.get_data_path(["data", "merged_deltat.data"])
[docs] class Timescale: """ Class for converting between time scales Attributes ---------- leaps: np.ndarray Number of leap seconds MJD: np.ndarray Modified Julian Days """ # Julian century century = 36525.0 # seconds per day day = 86400.0 # 360 degrees turn = 1.0 turndeg = 360.0 tau = 2.0 * np.pi # degrees to radians deg2rad = np.pi / 180.0 # degrees to arcseconds deg2asec = 3600.0 def __init__(self, MJD=None, leaps=None): # leap seconds self.leaps = leaps # modified Julian Days self.MJD = MJD # iterator self.__index__ = 0
[docs] @classmethod def from_deltatime( cls, delta_time: np.ndarray, epoch: str | tuple | list | np.ndarray, standard: str = "UTC", ): """ Converts a delta time array and into a ``Timescale`` object Parameters ---------- delta_time: np.ndarray seconds since ``epoch`` epoch: str, uuple, list or np.ndarray epoch for input ``delta_time`` standard: str, default 'UTC' time standard for input ``delta_time`` """ # assert delta time is an array delta_time = np.atleast_1d(delta_time) # calculate leap seconds if specified if standard.upper() == "GPS": GPS_Epoch_Time = convert_delta_time( 0, epoch1=epoch, epoch2=_gps_epoch, scale=1.0 ) GPS_Time = convert_delta_time( delta_time, epoch1=epoch, epoch2=_gps_epoch, scale=1.0 ) # calculate difference in leap seconds from start of epoch leaps = count_leap_seconds(GPS_Time) - count_leap_seconds( np.atleast_1d(GPS_Epoch_Time) ) elif standard.upper() == "LORAN": # LORAN time is ahead of GPS time by 9 seconds GPS_Epoch_Time = convert_delta_time( -9.0, epoch1=epoch, epoch2=_gps_epoch, scale=1.0 ) GPS_Time = convert_delta_time( delta_time - 9.0, epoch1=epoch, epoch2=_gps_epoch, scale=1.0 ) # calculate difference in leap seconds from start of epoch leaps = count_leap_seconds(GPS_Time) - count_leap_seconds( np.atleast_1d(GPS_Epoch_Time) ) elif standard.upper() == "TAI": # TAI time is ahead of GPS time by 19 seconds GPS_Epoch_Time = convert_delta_time( -19.0, epoch1=epoch, epoch2=_gps_epoch, scale=1.0 ) GPS_Time = convert_delta_time( delta_time - 19.0, epoch1=epoch, epoch2=_gps_epoch, scale=1.0 ) # calculate difference in leap seconds from start of epoch leaps = count_leap_seconds(GPS_Time) - count_leap_seconds( np.atleast_1d(GPS_Epoch_Time) ) elif standard.upper() == "GLONASS": # GLONASS time is ahead of UTC time by 3 hours leaps = 3.0 * 3600.0 else: leaps = 0.0 # convert time to days relative to Modified Julian days in UTC MJD = convert_delta_time( delta_time - leaps, epoch1=epoch, epoch2=_mjd_epoch, scale=(1.0 / 86400.0), ) return cls(MJD=MJD, leaps=leaps)
[docs] @classmethod def from_calendar( cls, year: np.ndarray, month: np.ndarray | float = 1.0, day: np.ndarray | float = 1.0, hour: np.ndarray | float = 0.0, minute: np.ndarray | float = 0.0, second: np.ndarray | float = 0.0, ): """ Converts calendar date arrays into a ``Timescale`` object Parameters ---------- year: np.ndarray calendar year month: np.ndarray or float, default 1.0 month of the year day: np.ndarray or float, default 1.0 day of the month hour: np.ndarray or float, default 0.0 hour of the day minute: np.ndarray or float, default 0.0 minute of the hour second: np.ndarray or float, default 0.0 second of the minute """ # verify input data types year = np.array(year, dtype=np.float64) month = np.array(month, dtype=np.float64) day = np.array(day, dtype=np.float64) hour = np.array(hour, dtype=np.float64) minute = np.array(minute, dtype=np.float64) second = np.array(second, dtype=np.float64) # calculate date in Modified Julian Days (MJD) from calendar date # MJD: days since November 17, 1858 (1858-11-17T00:00:00) MJD = ( 367.0 * year - np.floor(1.75 * (year + np.floor((month + 9.0) / 12.0))) - np.floor( 0.75 * (np.floor((year + (month - 9.0) / 7.0) / 100.0) + 1.0) ) + np.floor(275.0 * month / 9.0) + day + hour / 24.0 + minute / 1440.0 + second / 86400.0 + 1721028.5 - _jd_mjd ) return cls(MJD=MJD)
[docs] @classmethod def from_datetime(cls, dtime: np.ndarray): """ Reads a ``datetime`` array and converts into a ``Timescale`` object Parameters ---------- dtime: np.ndarray ``numpy.datetime64`` array """ # convert delta time array from datetime object # to days relative to 1992-01-01T00:00:00 MJD = convert_datetime(dtime, epoch=_mjd_epoch) / 86400.0 return cls(MJD=MJD)
[docs] @classmethod def from_list(cls, temp): """ Reads a list of ``Timescale`` objects and converts into a single ``Timescale`` object Parameters ---------- temp: list list of ``Timescale`` objects """ # convert list of timescale objects to a single timescale object MJD = np.array([t.MJD for t in temp]) return cls(MJD=MJD)
[docs] def to_calendar(self): """ Convert a ``Timescale`` object to a ``Calendar`` object """ return Calendar(self.utc)
[docs] def to_deltatime( self, epoch: str | tuple | list | np.ndarray, scale: float = 1.0 ): """ Convert a ``Timescale`` object to a delta time array Parameters ---------- epoch: str, tuple, list, or np.ndarray epoch for output ``delta_time`` scale: float, default 1.0 scaling factor for converting time to output units Returns ------- delta_time: np.ndarray time since epoch """ # convert epochs to numpy datetime variables epoch1 = np.datetime64(datetime.datetime(*_mjd_epoch)) if isinstance(epoch, (tuple, list)): epoch = np.datetime64(datetime.datetime(*epoch)) elif isinstance(epoch, str): epoch = np.datetime64(parse(epoch)) # calculate the difference in epochs in days delta_time_epochs = (epoch - epoch1) / np.timedelta64(1, "D") # return the date in time (default days) since epoch return scale * np.array(self.MJD - delta_time_epochs, dtype=np.float64)
[docs] def to_datetime(self, unit="ns"): """ Convert a ``Timescale`` object to a ``datetime`` array Returns ------- dtime: np.ndarray ``numpy.datetime64`` array """ # verify that units are numpy datetime compatible assert unit in ["D", "h", "m", "s", "ms", "us", "ns"] # convert Modified Julian Day epoch to datetime variable epoch = np.datetime64(datetime.datetime(*_mjd_epoch)) # calculate the delta time in the specified units scale = self.day * _from_sec[unit] delta_time = np.atleast_1d(self.MJD * scale).astype(np.int64) # return the datetime array return np.array(epoch + delta_time.astype(f"timedelta64[{unit}]"))
[docs] def to_string(self, unit: str = "s", **kwargs): """ Convert a ``Timescale`` object to a formatted string array Parameters ---------- unit: str, default 's' datetime unit for output string array **kwargs: dict keyword arguments for datetime formatting """ return np.datetime_as_string( self.to_datetime(unit=unit), unit=unit, **kwargs )
# PURPOSE: calculate the sum of a polynomial function of time
[docs] def polynomial_sum(self, coefficients: list | np.ndarray, t: np.ndarray): """ Calculates the sum of a polynomial function of time Parameters ---------- coefficients: list or np.ndarray leading coefficient of polynomials of increasing order t: np.ndarray delta time in units for a given astronomical longitudes calculation """ # convert time to array if importing a single value t = np.atleast_1d(t) return np.sum([c * (t**i) for i, c in enumerate(coefficients)], axis=0)
@timescale.utilities.reify def era(self): """Earth Rotation Angle (ERA) in degrees""" # earth rotation angle using Universal Time _jd_j2000 = _jd_mjd + _mjd_j2000 # UT1 in days since J2000 J = self.ut1 - _jd_j2000 fraction = np.mod(J, self.turn) theta = np.mod(0.7790572732640 + 0.00273781191135448 * J, self.turn) return self.turndeg * np.mod(theta + fraction, self.turn) @timescale.utilities.reify def gha(self): """Greenwich Hour Angle (GHA) in degrees""" return np.mod( self.gmst * self.turndeg + self.turndeg * self.T * self.century + self.turndeg / 2.0, self.turndeg, ) @timescale.utilities.reify def gmst(self): """Greenwich Mean Sidereal Time (GMST) in fractions of day""" GMST = np.array([24110.54841, 8640184.812866, 9.3104e-2, -6.2e-6]) # UT1 as Julian centuries _jd_j2000 = _jd_mjd + _mjd_j2000 ut1 = (self.ut1 - _jd_j2000) / self.century # convert from seconds to fractions of day return np.mod(self.polynomial_sum(GMST, ut1) / self.day, self.turn) @timescale.utilities.reify def gps(self): """Seconds since 1980-01-06T00:00:00""" # return the GPS time return (self.utc - _jd_gps) * self.day + self.gps_utc @timescale.utilities.reify def gps_utc(self): """Leap seconds between GPS and UTC time""" # dynamic time is ahead of TAI by 32.184 seconds _tt_tai = 32.184 # TAI time is ahead of GPS by 19 seconds _tai_gps = 19.0 # convert from dynamic time to TAI TAI = np.atleast_1d(self.tt - _jd_gps) * self.day - _tt_tai # calculate the number of leap seconds return count_leap_seconds(TAI - _tai_gps) @timescale.utilities.reify def gps_week(self): """GPS week number since 1980-01-06T00:00:00""" return (self.gps / (self.day * 7)).astype(np.int64) @timescale.utilities.reify def J2000(self): """Seconds (Terrestrial Time) since 2000-01-01T12:00:00""" _jd_j2000 = _jd_mjd + _mjd_j2000 return (self.tt - _jd_j2000) * self.day @timescale.utilities.reify def st(self): """Greenwich Mean Sidereal Time (GMST) in fractions of a day from the Equinox Method """ # IAU 2000 model for GMST # sidereal time polynomial coefficients in arcseconds sidereal_time = np.array( [0.014506, 4612.156534, 1.3915817, -4.4e-7, -2.9956e-05, -3.68e-08] ) ST = self.polynomial_sum(sidereal_time, self.T) # get earth rotation angle and convert to arcseconds # convert from arcseconds to fractions of day return ( np.mod(ST + self.era * self.deg2asec, self.turnasec) / self.turnasec ) @timescale.utilities.reify def tdb(self): """Approximate Barycentric Dynamical Time (TDB) as Julian Days""" # calculate the approximate TDB time return self.tt + self.tdb_tt @timescale.utilities.reify def tdb_tt(self): """ Difference between Barycentric Dynamical Time (TDB) and terrestrial time (TT) :cite:p:`Fairhead:1990vz,Kaplan:2005kj` """ # truncated Fairhead and Bretagnon series FB = ( 0.001657 * np.sin(628.3076 * self.T + 6.2401) + 0.000022 * np.sin(575.3385 * self.T + 4.2970) + 0.000014 * np.sin(1256.6152 * self.T + 6.1969) + 0.000005 * np.sin(606.9777 * self.T + 4.0212) + 0.000005 * np.sin(52.9691 * self.T + 0.4444) + 0.000002 * np.sin(21.3299 * self.T + 5.5431) + 0.000010 * self.T * np.sin(628.3076 * self.T + 4.2490) ) # convert from seconds to days return FB / self.day @timescale.utilities.reify def tide(self): """Days since 1992-01-01T00:00:00""" return self.MJD - _mjd_tide @timescale.utilities.reify def tt(self): """Terrestrial Time (TT) as Julian Days""" return self.MJD + self.tt_ut1 + _jd_mjd @timescale.utilities.reify def tt_ut1(self): """ Difference between dynamical time (TT) and universal time (UT1) """ # return the delta time for the input date converted to days return interpolate_delta_time(_delta_file, self.tide) @timescale.utilities.reify def ut1_utc(self): """ Difference between universal time (UT1) and coordinated universal time (UTC) """ # dynamic time is ahead of TAI by 32.184 seconds _tt_tai = 32.184 # TAI time is ahead of GPS by 19 seconds _tai_gps = 19.0 # convert from delta times back to seconds _tt_ut1 = self.day * self.tt_ut1 # recalculate UT1-UTC (seconds) return _tt_tai + _tai_gps + self.gps_utc - _tt_ut1 @timescale.utilities.reify def T(self): """Centuries since 2000-01-01T12:00:00""" _jd_j2000 = _jd_mjd + _mjd_j2000 return (self.tt - _jd_j2000) / self.century @timescale.utilities.reify def B(self): """Time in Besselian years :cite:p:`Lieske:1979wv`""" return 1900.0 + (self.MJD - 15019.81352) / 365.242198781 @timescale.utilities.reify def ut1(self): """Universal Time (UT) as Julian Days""" # convert UT1-UTC to days return self.utc + self.ut1_utc / self.day @timescale.utilities.reify def ut2(self): """UT0 corrected for polar motion and seasonal variation""" theta = 2.0 * np.pi * self.B ut2_ut1 = ( 0.022 * np.sin(theta) - 0.012 * np.cos(theta) - 0.006 * np.sin(2.0 * theta) + 0.007 * np.cos(2.0 * theta) ) return self.ut1 + ut2_ut1 @timescale.utilities.reify def utc(self): """Coordinated Universal Time (UTC) as Julian Days""" return self.MJD + _jd_mjd @timescale.utilities.reify def year(self): """Universal Time (UT) as calendar year""" Y, M, D, h, m, s = convert_julian(self.utc, format="tuple") return convert_calendar_decimal(Y, M, D, hour=h, minute=m, second=s) @timescale.utilities.reify def nominal_year(self): """Universal Time (UT) as nominal years of 365.25 days""" return 1992.0 + self.tide / 365.25
[docs] def min(self): """Minimum time value as a ``Timescale`` object""" return Timescale(MJD=np.nanmin(self.MJD))
[docs] def max(self): """Maximum time value as a ``Timescale`` object""" return Timescale(MJD=np.nanmax(self.MJD))
[docs] def mean(self): """Mean time value as a ``Timescale`` object""" return Timescale(MJD=np.nanmean(self.MJD))
@property def turnasec(self): """Arcseconds in a full turn""" return self.turndeg * self.deg2asec @property def asec2rad(self): """Arcseconds to radians""" return self.deg2rad / self.deg2asec @property def masec2rad(self): """Microarcseconds to radians""" return self.asec2rad / 1.0e6 @property def dtype(self): """Main data type of ``Timescale`` object""" return self.MJD.dtype @property def shape(self): """Dimensions of ``Timescale`` object""" return np.shape(self.MJD) @property def ndim(self): """Number of dimensions in ``Timescale`` object""" return np.ndim(self.MJD) def __str__(self): """String representation of the ``Timescale`` object""" properties = ["timescale.time.Timescale"] return "\n".join(properties) def __len__(self): """Number of time values""" return len(np.atleast_1d(self.MJD)) def __getitem__(self, ind): """Subset ``Timescale`` object to indices""" temp = Timescale() temp.MJD = np.atleast_1d(self.MJD)[ind].copy() return temp def __iter__(self): """Iterate over time values""" self.__index__ = 0 return self def __next__(self): """Get the next time step""" temp = Timescale() try: temp.MJD = np.atleast_1d(self.MJD)[self.__index__].copy() except IndexError as exc: raise StopIteration from exc # add to index self.__index__ += 1 return temp
[docs] class Calendar: """ Class for converting from Julian dates to calendar dates """ def __init__(self, utc=None): # Julian Days self.utc = utc self.from_julian()
[docs] def from_julian(self): """ Converts from Julian dates to calendar dates """ # convert Julian date to calendar for key, val in convert_julian(self.utc).items(): setattr(self, key, val)
@property def dtype(self): """Main data type of ``Calendar`` object""" return self.utc.dtype @property def shape(self): """Dimensions of ``Calendar`` object""" return np.shape(self.utc) @property def ndim(self): """Number of dimensions in ``Calendar`` object""" return np.ndim(self.utc) def __str__(self): """String representation of the ``Calendar`` object""" properties = ["timescale.time.Calendar"] return "\n".join(properties) def __len__(self): """Number of time values""" return len(np.atleast_1d(self.utc)) def __getitem__(self, ind): """Subset ``Calendar`` object to indices""" utc = np.atleast_1d(self.utc)[ind].copy() return Calendar(utc=utc) def __iter__(self): """Iterate over time values""" self.__index__ = 0 return self def __next__(self): """Get the next time step""" try: utc = np.atleast_1d(self.utc)[self.__index__].copy() except IndexError as exc: raise StopIteration from exc # add to index self.__index__ += 1 return Calendar(utc=utc)
# PURPOSE: calculate the difference between dynamical time and universal time # by interpolating a delta time file to a given date
[docs] def interpolate_delta_time( delta_file: str | pathlib.Path | None, idays: np.ndarray, validate: bool = False, ): """ Calculates the difference between dynamical time (TT) and universal time (UT) :cite:p:`Meeus:1991vh` Parameters ---------- delta_file: str or Pathlib.Path file containing the delta times (TT-UT1) idays: np.ndarray input times to interpolate (days since 1992-01-01T00:00:00) validate: bool, default False check that delta time file is up to date Returns ------- deltat: np.ndarray estimated TT-UT1 values """ # verify delta time file path delta_file = pathlib.Path(delta_file).expanduser().absolute() # check if delta time file is up to date and exists if validate: validate_delta_time(delta_file) elif not delta_file.exists(): raise FileNotFoundError(delta_file) # names and formats of delta time file columns dtype = dict(names=("Y", "M", "D", "tt_ut1"), formats=("i", "i", "i", "f8")) # read delta time file dinput = np.loadtxt(delta_file, dtype=dtype) # calculate Julian days and then convert to days since 1992-01-01T00:00:00 days = convert_calendar_dates( dinput["Y"], dinput["M"], dinput["D"], epoch=_tide_epoch ) # use scipy interpolating splines to interpolate delta times spl = scipy.interpolate.UnivariateSpline( days, dinput["tt_ut1"], k=1, s=0, ext=0 ) # return the delta time for the input date converted to days deltat = spl(idays) / 86400.0 return deltat
# PURPOSE: Count number of leap seconds that have passed for each GPS time
[docs] def count_leap_seconds(GPS_Time: np.ndarray | float, truncate: bool = True): """ Counts the number of leap seconds between a given GPS time and UTC Parameters ---------- GPS_Time: np.ndarray or float seconds since January 6, 1980 at 00:00:00 truncate: bool, default True Reduce list of leap seconds to positive GPS times Returns ------- n_leaps: float number of elapsed leap seconds """ # get the valid leap seconds leaps = get_leap_seconds(truncate=truncate) # number of leap seconds prior to GPS_Time n_leaps = np.zeros_like(GPS_Time, dtype=np.float64) for i, leap in enumerate(leaps): count = np.count_nonzero(GPS_Time >= leap) if count > 0: indices = np.nonzero(GPS_Time >= leap) n_leaps[indices] += 1.0 # return the number of leap seconds for converting to UTC return n_leaps
# PURPOSE: Define GPS leap seconds
[docs] def get_leap_seconds(truncate: bool = True): """ Gets a list of GPS times for when leap seconds occurred Parameters ---------- truncate: bool, default True Reduce list of leap seconds to positive GPS times Returns ------- leap_GPS: float GPS seconds when leap seconds occurred """ leap_secs = timescale.utilities.get_data_path(["data", "leap-seconds.list"]) # find line with file expiration as delta time with leap_secs.open(mode="r", encoding="utf8") as fid: (secs,) = [ re.findall(r"\d+", i).pop() for i in fid.read().splitlines() if re.match(r"^(?=#@)", i) ] # check that leap seconds file is still valid network_time = datetime.datetime(*_ntp_epoch) expiry = network_time + datetime.timedelta(seconds=int(secs)) today = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) update_leap_seconds() if (expiry < today) else None # get leap seconds leap_UTC, TAI_UTC = np.loadtxt(leap_secs).T # TAI time is ahead of GPS by 19 seconds TAI_GPS = 19.0 # convert leap second epochs from NTP to GPS # convert from time of 2nd leap second to time of 1st leap second leap_GPS = convert_delta_time( leap_UTC + TAI_UTC - TAI_GPS - 1, epoch1=_ntp_epoch, epoch2=_gps_epoch ) # return the GPS times of leap second occurrence if truncate: return leap_GPS[leap_GPS >= 0].astype(np.float64) else: return leap_GPS.astype(np.float64)
# PURPOSE: connects to servers and downloads leap second files
[docs] def update_leap_seconds( timeout: int | None = 20, verbose: bool = False, mode: oct = 0o775 ): """ Connects to servers to download leap-seconds.list files from NIST servers - https://www.nist.gov/pml/time-and-frequency-division/leap-seconds-faqs Servers and Mirrors - ftp://ftp.boulder.nist.gov/pub/time/leap-seconds.list - https://hpiers.obspm.fr/iers/bul/bulc/ntp/leap-seconds.list - https://data.iana.org/time-zones/data/leap-seconds.list Parameters ---------- timeout: int or None, default 20 timeout in seconds for blocking operations verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file """ # local version of file FILE = "leap-seconds.list" LOCAL = timescale.utilities.get_data_path(["data", FILE]) HASH = timescale.utilities.get_hash(LOCAL) # try downloading from NIST Boulder ftp servers HOST = ["ftp.boulder.nist.gov", "pub", "time", FILE] try: timescale.utilities.check_ftp_connection(HOST[0]) timescale.utilities.from_ftp( HOST, timeout=timeout, local=LOCAL, hash=HASH, verbose=verbose, mode=mode, ) except Exception as exc: logging.debug(traceback.format_exc()) pass else: return # try downloading from Paris Observatory IERS Centers REMOTE = ["https://hpiers.obspm.fr", "iers", "bul", "bulc", "ntp", FILE] try: timescale.utilities.from_http( REMOTE, timeout=timeout, local=LOCAL, hash=HASH, verbose=verbose, mode=mode, ) except Exception as exc: logging.debug(traceback.format_exc()) pass else: return # try downloading from Internet Assigned Numbers Authority (IANA) REMOTE = ["https://data.iana.org", "time-zones", "data", FILE] try: timescale.utilities.from_http( REMOTE, timeout=timeout, local=LOCAL, hash=HASH, verbose=verbose, mode=mode, ) except Exception as exc: logging.debug(traceback.format_exc()) pass else: return
# PURPOSE: check if the delta time file is still valid
[docs] def validate_delta_time(delta_file: str | pathlib.Path = _delta_file): """ Checks that the delta time file is still up to date Parameters ---------- delta_file: str or Pathlib.Path file containing the delta times (TT-UT1) """ # verify delta time file path delta_file = pathlib.Path(delta_file).expanduser().absolute() # check that delta time file is accessible: if not download if not delta_file.exists(): update_delta_time(delta_file) return # names and formats of delta time file columns dtype = dict(names=("Y", "M", "D", "tt_ut1"), formats=("i", "i", "i", "f8")) # read delta time file dinput = np.loadtxt(delta_file, dtype=dtype) # get the last date in the delta time file last = datetime.datetime(dinput["Y"][-1], dinput["M"][-1], dinput["D"][-1]) # check if delta time file is still valid (expires after 8 weeks) today = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) expiry = last + datetime.timedelta(weeks=8) # if expired: get latest delta time file from repository update_delta_time(delta_file) if (expiry < today) else None
# PURPOSE: downloads a newer version of a delta time file
[docs] def update_delta_time( delta_file: str | pathlib.Path, branch: str = "main", timeout: int | None = 20, verbose: bool = False, mode: oct = 0o775, ): """ Downloads an updated delta time file from the project GitHub repository Parameters ---------- delta_file: str or Pathlib.Path file containing the delta times (TT-UT1) branch: str, default 'main' branch of the GitHub repository for downloading files timeout: int or None, default 20 timeout for the HTTP request in seconds verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file Notes ----- Delta times are the difference between dynamical time and universal time """ # verify delta time file path delta_file = pathlib.Path(delta_file).expanduser().absolute() # MD5 hash for comparing with remote HASH = timescale.utilities.get_hash(delta_file) # try downloading from GitHub repository HOST = timescale.utilities.get_github_url( ["timescale", "data", delta_file.name], branch=branch ) # log message for failed download msg = f"Unable to download {delta_file.name} from timescale repository" try: timescale.utilities.from_http( HOST, local=delta_file, hash=HASH, timeout=timeout, verbose=verbose, mode=mode, ) except timescale.utilities.urllib2.HTTPError as exc: logging.info(msg) logging.debug(exc.code) except timescale.utilities.urllib2.URLError as exc: logging.info(msg) logging.debug(exc.reason)
# PURPOSE: Download delta time files and merge into a single
[docs] def merge_delta_time( username: str | None = None, password: str | None = None, verbose: bool = False, mode: oct = 0o775, ): """ Connects to servers to download historic_deltat.data and deltat.data files Reads IERS Bulletin-A produced iers_deltat.data files Creates a merged file combining the historic, monthly and daily files Long-term Delta T - https://www.usno.navy.mil/USNO/earth-orientation/eo-products/long-term Parameters ---------- username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file Notes ----- Delta times are the difference between dynamical time and universal time """ # retrieve history delta time files pull_deltat_file( "historic_deltat.data", username=username, password=password, verbose=verbose, mode=mode, ) # read historic delta time file historic_file = timescale.utilities.get_data_path( ["data", "historic_deltat.data"] ) historic = np.loadtxt(historic_file, skiprows=2) HY = np.floor(historic[:, 0]) HM = 12.0 * np.mod(historic[:, 0], 1.0) + 1.0 HD = np.ones_like(historic[:, 0]) # retrieve monthly delta time files pull_deltat_file( "deltat.data", username=username, password=password, verbose=verbose, mode=mode, ) # read modern monthly delta time file monthly_file = timescale.utilities.get_data_path(["data", "deltat.data"]) monthly = np.loadtxt(monthly_file) monthly_time = convert_calendar_decimal( monthly[:, 0], monthly[:, 1], day=monthly[:, 2] ) # retrieve daily delta time files merge_bulletin_a_files( username=username, password=password, verbose=verbose, mode=mode ) # read modern daily delta time file from IERS Bulletin A files daily_file = timescale.utilities.get_data_path(["data", "iers_deltat.data"]) daily = np.loadtxt(daily_file) daily_time = convert_calendar_decimal( daily[:, 0], daily[:, 1], day=daily[:, 2] ) # write to new merged file merged_file = timescale.utilities.get_data_path( ["data", "merged_deltat.data"] ) fid = merged_file.open(mode="w", encoding="utf8") logging.info(str(merged_file)) file_format = " {0:4.0f} {1:2.0f} {2:2.0f} {3:7.4f}" # use historical values for times prior to monthly (ind1,) = np.nonzero(historic[:, 0] < monthly_time[0]) for i in ind1: args = (HY[i], HM[i], HD[i], historic[i, 1]) print(file_format.format(*args), file=fid) # use monthly values for times prior to daily (ind2,) = np.nonzero(monthly_time < np.min(daily_time)) for i in ind2: args = (monthly[i, 0], monthly[i, 1], monthly[i, 2], monthly[i, 3]) print(file_format.format(*args), file=fid) # use daily values for all times available for i in np.argsort(daily_time): args = (daily[i, 0], daily[i, 1], daily[i, 2], daily[i, 3]) print(file_format.format(*args), file=fid) # close the merged file and change the permissions mode fid.close() merged_file.chmod(mode)
# PURPOSE: Append Bulletin-A file to merged delta time file
[docs] def append_delta_time(verbose: bool = False, mode: oct = 0o775): """ Appends merged delta time file with values from latest Bulletin-A file Parameters ---------- verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file Notes ----- Delta times are the difference between dynamical time and universal time """ # merged delta time file merged_file = timescale.utilities.get_data_path( ["data", "merged_deltat.data"] ) # read merged delta time file dinput = np.loadtxt(merged_file) merged_time = convert_calendar_decimal( dinput[:, 0], dinput[:, 1], day=dinput[:, 2] ) # append to merged file fid = merged_file.open(mode="a", encoding="utf8") logging.info(str(merged_file)) # read latest Bulletin-A file from IERS bulletin_file = timescale.utilities.get_data_path(["data", "ser7.dat"]) logging.info(str(bulletin_file)) with bulletin_file.open(mode="rb") as fileID: YY, MM, DD, DELTAT = read_iers_bulletin_a(fileID) # append latest delta time values to merged file for Y, M, D, T in zip(YY, MM, DD, DELTAT): daily_time = convert_calendar_decimal(float(Y), float(M), day=float(D)) # check if date is already in merged file if daily_time in merged_time: logging.info(f"{Y:4.0f}-{M:02.0f}-{D:02.0f} exists in merged file") continue # write to merged file print(f" {Y:4.0f} {M:2.0f} {D:2.0f} {T:7.4f}", file=fid) # close the merged file and change the permissions mode fid.close() merged_file.chmod(mode)
# PURPOSE: connect to IERS or CDDIS server and merge Bulletin-A files
[docs] def merge_bulletin_a_files( username: str | None = None, password: str | None = None, verbose: bool = False, mode: oct = 0o775, ): """ Attempt to connects to the IERS server and the CDDIS Earthdata server to download and merge Bulletin-A files Reads the IERS Bulletin-A files and calculates the daily delta times Servers and Mirrors - https://datacenter.iers.org/availableVersions.php?id=6 - ftp://ftp.iers.org/products/eop/rapid/bulletina/ - https://cddis.nasa.gov/archive/products/iers/iers_bulletins/bulletin_a/ Parameters ---------- username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file Notes ----- Delta times are the difference between dynamical time and universal time """ # if complete: replace previous version of file LOCAL = timescale.utilities.get_data_path(["data", "iers_deltat.data"]) COPY = timescale.utilities.get_data_path(["data", "iers_deltat.temp"]) # try connecting to IERS http servers and merge Bulletin-A files try: iers_delta_time(COPY, verbose=verbose, mode=mode) except Exception as exc: logging.debug(traceback.format_exc()) COPY.unlink() if COPY.exists() else None pass else: timescale.utilities.copy(COPY, LOCAL, move=True) return # try connecting to IERS ftp servers and merge Bulletin-A files try: iers_ftp_delta_time(COPY, verbose=verbose, mode=mode) except Exception as exc: logging.debug(traceback.format_exc()) COPY.unlink() if COPY.exists() else None pass else: timescale.utilities.copy(COPY, LOCAL, move=True) return # try connecting to CDDIS https servers and merge Bulletin-A files try: cddis_delta_time( COPY, username=username, password=password, verbose=verbose, mode=mode, ) except Exception as exc: logging.debug(traceback.format_exc()) COPY.unlink() if COPY.exists() else None pass else: timescale.utilities.copy(COPY, LOCAL, move=True) return
# PURPOSE: connects to IERS ftp servers and finds Bulletin-A files
[docs] def iers_ftp_delta_time( daily_file: str | pathlib.Path, timeout: int | None = 120, verbose: bool = False, mode: oct = 0o775, ): """ Connects to the IERS ftp server to download Bulletin-A files - https://datacenter.iers.org/productMetadata.php?id=6 Reads the IERS Bulletin-A files and calculates the daily delta times Servers and Mirrors - ftp://ftp.iers.org/products/eop/rapid/bulletina/ Parameters ---------- daily_file: str or pathlib.Path output daily delta time file from merged Bulletin-A files timeout: int, default 120 timeout in seconds for blocking operations verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file Notes ----- Delta times are the difference between dynamical time and universal time """ # connect to ftp host for IERS bulletins HOST = ["ftp.iers.org", "products", "eop", "rapid", "bulletina"] timescale.utilities.check_ftp_connection(HOST[0]) # regular expression pattern for finding files rx = re.compile(r"bulletina-(.*?)-(\d+).txt$", re.VERBOSE) # open output daily delta time file daily_file = pathlib.Path(daily_file).expanduser().absolute() fid = daily_file.open(mode="w", encoding="utf8") # find subdirectories subdirectory, _ = timescale.utilities.ftp_list( HOST, timeout=timeout, basename=True, sort=True ) # for each subdirectory for SUB in subdirectory: # find Bulletin-A files in ftp subdirectory HOST.append(SUB) logging.info(SUB) bulletin_files, _ = timescale.utilities.ftp_list( HOST, timeout=timeout, basename=True, sort=True, pattern=rx ) # for each Bulletin-A file for f in sorted(bulletin_files): logging.info(f) # copy remote file contents to BytesIO object HOST.append(f) remote_buffer = timescale.utilities.from_ftp( HOST, timeout=timeout, verbose=verbose ) # read Bulletin-A file from BytesIO object YY, MM, DD, DELTAT = read_iers_bulletin_a(remote_buffer) # print delta time for week to output file for Y, M, D, T in zip(YY, MM, DD, DELTAT): print(f" {Y:4.0f} {M:2.0f} {D:2.0f} {T:7.4f}", file=fid) # close the bytesIO object remote_buffer.close() # remove the file from the list HOST.remove(f) # remove the subdirectory from the list HOST.remove(SUB) # close the output file fid.close() # change the permissions mode daily_file.chmod(mode)
# PURPOSE: connects to IERS http servers and finds Bulletin-A files
[docs] def iers_delta_time( daily_file: str | pathlib.Path, timeout: int | None = 120, verbose: bool = False, mode: oct = 0o775, ): """ Connects to the IERS server to download Bulletin-A files - https://datacenter.iers.org/productMetadata.php?id=6 Reads the IERS Bulletin-A files and calculates the daily delta times Servers and Mirrors - https://datacenter.iers.org/availableVersions.php?id=6 Parameters ---------- daily_file: str or pathlib.Path output daily delta time file from merged Bulletin-A files timeout: int, default 120 timeout in seconds for blocking operations verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file Notes ----- Delta times are the difference between dynamical time and universal time """ # open output daily delta time file daily_file = pathlib.Path(daily_file).expanduser().absolute() fid = daily_file.open(mode="w", encoding="utf8") file_format = " {0:4.0f} {1:2.0f} {2:2.0f} {3:7.4f}" # connect to http host for IERS Bulletin-A files HOST = "https://datacenter.iers.org/availableVersions.php?id=6" bulletin_files, _ = timescale.utilities.iers_list(HOST, timeout=timeout) # for each Bulletin-A file for f in bulletin_files: logging.info(f) remote_buffer = timescale.utilities.from_http(f, timeout=timeout) # read Bulletin-A file from BytesIO object YY, MM, DD, DELTAT = read_iers_bulletin_a(remote_buffer) # print delta time for week to output file for Y, M, D, T in zip(YY, MM, DD, DELTAT): print(file_format.format(Y, M, D, T), file=fid) # close the bytesIO object remote_buffer.close() # close the output file fid.close() # change the permissions mode daily_file.chmod(mode)
# PURPOSE: connects to CDDIS Earthdata https server and finds Bulletin-A files
[docs] def cddis_delta_time( daily_file: str | pathlib.Path, username: str | None = None, password: str | None = None, verbose: bool = False, mode: oct = 0o775, ): """ Connects to the CDDIS Earthdata server to download Bulletin-A files Reads the IERS Bulletin-A files and calculates the daily delta times Servers and Mirrors - https://cddis.nasa.gov/archive/products/iers/iers_bulletins/bulletin_a/ Parameters ---------- daily_file: str output daily delta time file from merged Bulletin-A files username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file Notes ----- Delta times are the difference between dynamical time and universal time """ # connect to CDDIS Earthdata host for IERS bulletins HOST = [ "https://cddis.nasa.gov", "archive", "products", "iers", "iers_bulletins", "bulletin_a", ] # build NASA Earthdata opener for CDDIS and check credentials timescale.utilities.build_opener(username, password) timescale.utilities.check_credentials() # regular expression pattern for finding directories R1 = re.compile(r"volume_(.*?)$", re.VERBOSE) # regular expression pattern for finding files R2 = re.compile(r"iers_bulletina\.(.*?)_(\d+)$", re.VERBOSE) # open output daily delta time file daily_file = pathlib.Path(daily_file).expanduser().absolute() fid = daily_file.open(mode="w", encoding="utf8") file_format = " {0:4.0f} {1:2.0f} {2:2.0f} {3:7.4f}" # for each subdirectory subdirectory, mtimes = timescale.utilities.cddis_list( HOST, build=False, pattern=R1 ) # extract roman numerals from subdirectories roman = [R1.findall(s).pop() for s in subdirectory] # sort the list of Roman numerals subdirectory = [ subdirectory[i] for i, j in sorted( enumerate(roman), key=lambda i: timescale.utilities.roman_to_int(i[1]), ) ] # output file format for SUB in subdirectory: # find Bulletin-A files in https subdirectory HOST.append(SUB) bulletin_files, mtimes = timescale.utilities.cddis_list( HOST, build=False, sort=True, pattern=R2 ) # for each Bulletin-A file for f in sorted(bulletin_files): logging.info(f) # copy remote file contents to BytesIO object HOST.append(f) remote_buffer = timescale.utilities.from_cddis( HOST, build=False, timeout=20 ) # read Bulletin-A file from BytesIO object YY, MM, DD, DELTAT = read_iers_bulletin_a(remote_buffer) # print delta time for week to output file for Y, M, D, T in zip(YY, MM, DD, DELTAT): print(file_format.format(Y, M, D, T), file=fid) # close the bytesIO object remote_buffer.close() # remove the file from the list HOST.remove(f) # remove the subdirectory from the list HOST.remove(SUB) # close the output file fid.close() # change the permissions mode daily_file.chmod(mode)
# PURPOSE: reads IERS Bulletin-A and calculates the delta times
[docs] def read_iers_bulletin_a(fileID): """ Read a weekly IERS Bulletin-A file and calculate the delta times (TT - UT1) Parameters ---------- fileID: obj open file object for Bulletin-A file Returns ------- Y: float, calendar year M: float calendar month D: float day of the month DELTAT: float difference between universal time and dynamical time Notes ----- Delta times are the difference between dynamical time and universal time """ # read contents from input file object file_contents = fileID.read().decode("utf8").splitlines() # parse header text to find time offsets # TT-TAI TT_TAI = 0 # TAI-UTC TAI_UTC = 0 # counts the number of lines in the header count = 0 HEADER = False # Reading over header text while not HEADER: # file line at count l = file_contents[count] # check if line contains time offsets if re.search(r"TT\s\=\sTAI", l): TT_TAI = np.float64(re.findall(r"(\d+\.\d+)", l).pop()) if re.search(r"TAI-UTC", l): TAI_UTC = np.float64(re.findall(r"=\s(\d+\.\d+)", l).pop()) # find line to set HEADER flag to True HEADER = bool( re.search(r"COMBINED\sEARTH\sORIENTATION\sPARAMETERS:", l) ) # add 1 to counter count += 1 # convert variables to numpy arrays MJD = np.zeros((7)) UT1_UTC = np.zeros((7)) valid = 0 # for each day in the week for i in range(7): try: # split numerical instances from data line line_contents = file_contents[count + i + 4].split() # years are not always complete in the bulletin file # Modified Julian Day (days since 1858-11-17T00:00:00) MJD[i] = np.float64(line_contents[3]) # difference between UT1 and UTC times UT1_UTC[i] = np.float64(line_contents[8]) except (IndexError, ValueError): pass else: valid += 1 # calculate components for delta time # TAI time is ahead of GPS by 19 seconds TAI_GPS = 19.0 # calculate calendar dates from Modified Julian days Y, M, D, h, m, s = convert_julian(MJD[:valid] + _jd_mjd, format="tuple") # calculate GPS Time (seconds since 1980-01-06T00:00:00) # by converting the Modified Julian days (days since 1858-11-17T00:00:00) GPS_Time = ( convert_delta_time( MJD[:valid] * 8.64e4, epoch1=_mjd_epoch, epoch2=_gps_epoch, scale=1.0, ) + TAI_UTC - TAI_GPS ) # number of leap seconds between GPS and UTC # this finds the daily correction for weeks with leap seconds GPS_UTC = count_leap_seconds(GPS_Time) # calculate delta time (TT - UT1) --> # (TT-TAI) + (TAI-GPS) + (GPS-UTC) - (UT1-UTC) DELTAT = TT_TAI + TAI_GPS + GPS_UTC - UT1_UTC[:valid] # return dates and delta times return (Y, M, D, DELTAT)
# PURPOSE: connects to servers and downloads latest Bulletin-A file
[docs] def update_bulletin_a( timeout: int | None = 20, verbose: bool = False, mode: oct = 0o775 ): """ Connects to IERS Rapid Service/Prediction Center (RS/PC) and downloads latest Bulletin-A file - https://maia.usno.navy.mil/ser7/readme.bulla Servers and Mirrors - https://maia.usno.navy.mil/ser7/ser7.dat Parameters ---------- timeout: int or NoneType, default 20 timeout in seconds for blocking operations verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file """ # local version of file LOCAL = timescale.utilities.get_data_path(["data", "ser7.dat"]) HASH = timescale.utilities.get_hash(LOCAL) # try downloading from IERS Rapid Service/Prediction Center (RS/PC) REMOTE = ["https://maia.usno.navy.mil", "ser7", "ser7.dat"] try: timescale.utilities.from_http( REMOTE, timeout=timeout, local=LOCAL, hash=HASH, verbose=verbose, mode=mode, ) except Exception as exc: logging.debug(traceback.format_exc()) pass else: return
# PURPOSE: connects to servers and downloads delta time files
[docs] def pull_deltat_file( FILE: str, username: str | None = None, password: str | None = None, timeout: int | None = 20, verbose: bool = False, mode: oct = 0o775, ): """ Connects to servers and downloads delta time files Servers and Mirrors - http://maia.usno.navy.mil/ser7/ - https://cddis.nasa.gov/archive/products/iers/ - ftp://cddis.nasa.gov/products/iers/ - ftp://cddis.gsfc.nasa.gov/pub/products/iers/ Parameters ---------- FILE: str delta time file to download from remote servers - deltat.data: monthly deltat file - historic_deltat.data: historic deltat file username: str or NoneType, default None NASA Earthdata username password: str or NoneType, default None NASA Earthdata password timeout: int or NoneType, default 20 timeout in seconds for blocking operations verbose: bool, default False print file information about output file mode: oct, default 0o775 permissions mode of output file Notes ----- Delta times are the difference between dynamical time and universal time """ # local version of file LOCAL = timescale.utilities.get_data_path(["data", FILE]) HASH = timescale.utilities.get_hash(LOCAL) # try downloading from US Naval Oceanography Portal HOST = ["http://maia.usno.navy.mil", "ser7", FILE] try: timescale.utilities.from_http( HOST, timeout=timeout, local=LOCAL, hash=HASH, verbose=verbose, mode=mode, ) except Exception as exc: logging.debug(traceback.format_exc()) pass else: return # try downloading from NASA Crustal Dynamics Data Information System # NOTE: anonymous ftp access was discontinued on 2020-10-31 # need to use encrypted connection with email as password # https://www.earthdata.nasa.gov/centers/cddis-daac/archive-access HOST = ["gdc.cddis.eosdis.nasa.gov", "products", "iers", FILE] try: timescale.utilities.from_ftp( HOST, username=username, password=password, encrypted=True, timeout=timeout, local=LOCAL, hash=HASH, verbose=verbose, mode=mode, ) except Exception as exc: logging.debug(traceback.format_exc()) pass else: return # try downloading from NASA Crustal Dynamics Data Information System # using NASA Earthdata credentials stored in netrc file HOST = ["https://cddis.nasa.gov", "archive", "products", "iers", FILE] try: timescale.utilities.from_cddis( HOST, username=username, password=password, timeout=timeout, local=LOCAL, hash=HASH, verbose=verbose, mode=mode, ) except Exception as exc: logging.debug(traceback.format_exc()) pass else: return