# Source code for stingray.pulse.pulsar

```"""
Basic pulsar-related functions and statistics.
"""

import functools
from collections.abc import Iterable
import warnings
from scipy.optimize import minimize, basinhopping
import numpy as np

from .fftfit import fftfit as taylor_fftfit
from ..utils import simon, jit
from . import HAS_PINT, get_model, toa

__all__ = ['pulse_phase', 'phase_exposure', 'fold_events', 'profile_stat',
'z_n', 'fftfit', 'get_TOA', 'z_n_binned_events', 'z_n_gauss',
'z_n_events', 'htest', 'z_n_binned_events_all', 'z_n_gauss_all',
'z_n_events_all', 'get_orbital_correction_from_ephemeris_file']

def _default_value_if_no_key(dictionary, key, default):
try:
return dictionary[key]
except:
return default

def p_to_f(*period_derivatives):
"""Convert periods into frequencies, and vice versa.

For now, limited to third derivative. Raises when a
fourth derivative is passed.

Parameters
----------
p, pdot, pddot, ... : floats
period derivatives, starting from zeroth and in
increasing order

Examples
--------
>>> p_to_f() == []
True
>>> np.allclose(p_to_f(1), [1])
True
>>> np.allclose(p_to_f(1, 2), [1, -2])
True
>>> np.allclose(p_to_f(1, 2, 3), [1, -2, 5])
True
>>> np.allclose(p_to_f(1, 2, 3, 4), [1, -2, 5, -16])
True
>>> np.allclose(p_to_f(1, 2, 3, 4, 32, 22), [1, -2, 5, -16, 0, 0])
True
"""
nder = len(period_derivatives)
if nder == 0:
return []
fder = np.zeros_like(period_derivatives)
p = period_derivatives[0]
fder[0] = 1 / p

if nder > 1:
pd = period_derivatives[1]
fder[1] = -1 / p**2 * pd

if nder > 2:
pdd = period_derivatives[2]
fder[2] = 2 / p**3 * pd**2 - 1 / p**2 * pdd

if nder > 3:
pddd = period_derivatives[3]
fder[3] = - 6 / p**4 * pd ** 3 + 6 / p**3 * pd * pdd - \
1 / p**2 * pddd
if nder > 4:
warnings.warn("Derivatives above third are not supported")

return fder

[docs]def pulse_phase(times, *frequency_derivatives, **opts):
"""
Calculate pulse phase from the frequency and its derivatives.

Parameters
----------
times : array of floats
The times at which the phase is calculated
*frequency_derivatives: floats
List of derivatives in increasing order, starting from zero.

Other Parameters
----------------
ph0 : float
The starting phase
to_1 : bool, default True
Only return the fractional part of the phase, normalized from 0 to 1

Returns
-------
phases : array of floats
The absolute pulse phase

"""

ph0 = _default_value_if_no_key(opts, "ph0", 0)
to_1 = _default_value_if_no_key(opts, "to_1", True)
ph = ph0

for i_f, f in enumerate(frequency_derivatives):
ph += 1 / np.math.factorial(i_f + 1) * times**(i_f + 1) * f

if to_1:
ph -= np.floor(ph)
return ph

[docs]def phase_exposure(start_time, stop_time, period, nbin=16, gtis=None):
"""Calculate the exposure on each phase of a pulse profile.

Parameters
----------
start_time, stop_time : float
Starting and stopping time (or phase if ``period==1``)
period : float
The pulse period (if 1, equivalent to phases)

Other parameters
----------------
nbin : int, optional, default 16
The number of bins in the profile
gtis : [[gti00, gti01], [gti10, gti11], ...], optional, default None
Good Time Intervals

Returns
-------
expo : array of floats
The normalized exposure of each bin in the pulse profile (1 is the
highest exposure, 0 the lowest)
"""
if gtis is None:
gtis = np.array([[start_time, stop_time]])

# Use precise floating points -------------
start_time = np.longdouble(start_time)
stop_time = np.longdouble(stop_time)
period = np.longdouble(period)
gtis = np.array(gtis, dtype=np.longdouble)
# -----------------------------------------

expo = np.zeros(nbin)
phs = np.linspace(0, 1, nbin + 1)
phs = np.array(list(zip(phs[0:-1], phs[1:])))

# Discard gtis outside [start, stop]
good = np.logical_and(gtis[:, 0] < stop_time, gtis[:, 1] > start_time)
gtis = gtis[good]

for g in gtis:
g0 = g[0]
g1 = g[1]
if g0 < start_time:
# If the start of the fold is inside a gti, start from there
g0 = start_time
if g1 > stop_time:
# If the end of the fold is inside a gti, end there
g1 = stop_time
length = g1 - g0
# How many periods inside this length?
nraw = length / period
# How many integer periods?
nper = nraw.astype(int)

# First raw exposure: the number of periods
expo += nper / nbin

# FRACTIONAL PART =================
# What remains is additional exposure for part of the profile.
start_phase = np.fmod(g0 / period, 1)
end_phase = nraw - nper + start_phase

limits = [[start_phase, end_phase]]
# start_phase is always < 1. end_phase not always. In this case...
if end_phase > 1:
limits = [[0, end_phase - 1], [start_phase, 1]]

for l in limits:
l0 = l[0]
l1 = l[1]
# Discards bins untouched by these limits
goodbins = np.logical_and(phs[:, 0] <= l1, phs[:, 1] >= l0)
idxs = np.arange(len(phs), dtype=int)[goodbins]
for i in idxs:
start = np.max([phs[i, 0], l0])
stop = np.min([phs[i, 1], l1])
w = stop - start
expo[i] += w

return expo / np.max(expo)

[docs]def fold_events(times, *frequency_derivatives, **opts):
'''Epoch folding with exposure correction.

Parameters
----------
times : array of floats
f, fdot, fddot... : float
The frequency and any number of derivatives.

Other Parameters
----------------
nbin : int, optional, default 16
The number of bins in the pulse profile
weights : float or array of floats, optional
The weights of the data. It can either be specified as a single value
for all points, or an array with the same length as ``time``
gtis : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...], optional
Good time intervals
ref_time : float, optional, default 0
Reference time for the timing solution
expocorr : bool, default False
Correct each bin for exposure (use when the period of the pulsar is
comparable to that of GTIs)

Returns
-------
phase_bins : array of floats
The phases corresponding to the pulse profile
profile : array of floats
The pulse profile
profile_err : array of floats
The uncertainties on the pulse profile
'''
nbin = _default_value_if_no_key(opts, "nbin", 16)
weights = _default_value_if_no_key(opts, "weights", 1)
gtis = _default_value_if_no_key(opts, "gtis",
np.array([[times[0], times[-1]]]))
ref_time = _default_value_if_no_key(opts, "ref_time", 0)
expocorr = _default_value_if_no_key(opts, "expocorr", False)

if not isinstance(weights, Iterable):
weights *= np.ones(len(times))

gtis = gtis - ref_time
times = times - ref_time
# This dt has not the same meaning as in the Lightcurve case.
# it's just to define stop_time as a meaningful value after
# the last event.
dt = np.abs(times[1] - times[0])
start_time = times[0]
stop_time = times[-1] + dt

phases = pulse_phase(times, *frequency_derivatives, to_1=True)
gti_phases = pulse_phase(gtis, *frequency_derivatives, to_1=False)
start_phase, stop_phase = pulse_phase(np.array([start_time, stop_time]),
*frequency_derivatives,
to_1=False)
raw_profile, bins = np.histogram(phases,
bins=np.linspace(0, 1, nbin + 1),
weights=weights)

if expocorr:
expo_norm = phase_exposure(start_phase, stop_phase, 1, nbin,
gtis=gti_phases)
simon("For exposure != 1, the uncertainty might be incorrect")
else:
expo_norm = 1

# TODO: this is wrong. Need to extend this to non-1 weights

raw_profile_err = np.sqrt(raw_profile)

return bins[:-1] + np.diff(bins) / 2, raw_profile / expo_norm, \
raw_profile_err / expo_norm

[docs]def profile_stat(profile, err=None):
"""Calculate the epoch folding statistics \'a la Leahy et al. (1983).

Parameters
----------
profile : array
The pulse profile

Other Parameters
----------------
err : float or array
The uncertainties on the pulse profile

Returns
-------
stat : float
The epoch folding statistics
"""
mean = np.mean(profile)
if err is None:
err = np.sqrt(mean)
return np.sum((profile - mean) ** 2 / err ** 2)

@functools.lru_cache(maxsize=128)
def _cached_sin_harmonics(nbin, z_n_n):
"""Cached sine values corresponding to each of the nbin bins.

Parameters
----------
nbin : int
Number of bins
z_n_n : int
The number of harmonics (n) in the Z^2_n search
"""
dph = 1.0 / nbin
twopiphases = np.pi * 2 * np.arange(dph / 2, 1, dph)
cached_sin = np.zeros(z_n_n * nbin)
for i in range(z_n_n):
cached_sin[i * nbin: (i + 1) * nbin] = np.sin(twopiphases)
return cached_sin

@functools.lru_cache(maxsize=128)
def _cached_cos_harmonics(nbin, z_n_n):
"""Cached cosine values corresponding to each of the nbin bins.

Parameters
----------
nbin : int
Number of bins
z_n_n : int
The number of harmonics (n) in the Z^2_n search
"""
dph = 1.0 / nbin
twopiphases = np.pi * 2 * np.arange(dph / 2, 1, dph)
cached_cos = np.zeros(z_n_n * nbin)
for i in range(z_n_n):
cached_cos[i * nbin: (i + 1) * nbin] = np.cos(twopiphases)
return cached_cos

@jit(nopython=True)
def _z_n_fast_cached_sums_unnorm(prof, ks, cached_sin, cached_cos):
'''Calculate the unnormalized Z^2_k, for (k=1,.. n), of a pulsed profile.

Parameters
----------
prof : :class:`numpy.array`
The pulsed profile
ks : :class:`numpy.array` of int
The harmonic numbers, from 1 to n
cached_sin : :class:`numpy.array`
Cached sine values for each phase bin in the profile
cached_cos : :class:`numpy.array`
Cached cosine values for each phase bin in the profile
'''

all_zs = np.zeros(ks.size)
N = prof.size

total_sum = 0
for k in ks:
local_z = (
np.sum(cached_cos[: N * k: k] * prof) ** 2
+ np.sum(cached_sin[: N * k: k] * prof) ** 2
)
total_sum += local_z
all_zs[k - 1] = total_sum

return all_zs

[docs]def z_n_binned_events_all(profile, nmax=20):
'''Z^2_n statistic for multiple harmonics and binned events

See Bachetti+2021, arXiv:2012.11397

Parameters
----------
profile : array of floats
The folded pulse profile (containing the number of
photons falling in each pulse bin)
n : int
Number of harmonics, including the fundamental

Returns
-------
ks : list of ints
Harmonic numbers, from 1 to nmax (included)
z2_n : float
The value of the statistic for all ks
'''
cached_sin = _cached_sin_harmonics(profile.size, nmax)
cached_cos = _cached_cos_harmonics(profile.size, nmax)
ks = np.arange(1, nmax + 1, dtype=int)

total = np.sum(profile)
if total == 0:
return ks, np.zeros(nmax)
all_zs = _z_n_fast_cached_sums_unnorm(profile, ks, cached_sin, cached_cos)

return ks, all_zs * 2 / total

[docs]def z_n_gauss_all(profile, err, nmax=20):
'''Z^2_n statistic for n harmonics and normally-distributed profiles

See Bachetti+2021, arXiv:2012.11397

Parameters
----------
profile : array of floats
The folded pulse profile
err : float
The (assumed constant) uncertainty on the flux in each bin.
nmax : int
Maximum number of harmonics, including the fundamental

Returns
-------
ks : list of ints
Harmonic numbers, from 1 to nmax (included)
z2_n : list of floats
The value of the statistic for all ks
'''
cached_sin = _cached_sin_harmonics(profile.size, nmax)
cached_cos = _cached_cos_harmonics(profile.size, nmax)
ks = np.arange(1, nmax + 1, dtype=int)

all_zs = _z_n_fast_cached_sums_unnorm(profile, ks, cached_sin, cached_cos)

return ks, all_zs * (2 / profile.size / err**2)

@jit(nopython=True)
def z_n_events_all(phase, nmax=20):
'''Z^2_n statistics, a` la Buccheri+83, A&A, 128, 245, eq. 2.

Parameters
----------
phase : array of floats
The phases of the events
n : int, default 2
Number of harmonics, including the fundamental

Returns
-------
ks : list of ints
Harmonic numbers, from 1 to nmax (included)
z2_n : float
The Z^2_n statistic for all ks
'''
all_zs = np.zeros(nmax)
ks = np.arange(1, nmax + 1)
nphot = phase.size

total_sum = 0
phase = phase * 2 * np.pi

for k in ks:
local_z = (
np.sum(np.cos(k * phase)) ** 2
+ np.sum(np.sin(k * phase)) ** 2
)
total_sum += local_z
all_zs[k - 1] = total_sum

return ks, 2 / nphot * all_zs

[docs]def z_n_binned_events(profile, n):
'''Z^2_n statistic for pulse profiles from binned events

See Bachetti+2021, arXiv:2012.11397

Parameters
----------
profile : array of floats
The folded pulse profile (containing the number of
photons falling in each pulse bin)
n : int
Number of harmonics, including the fundamental

Returns
-------
z2_n : float
The value of the statistic
'''
_, all_zs = z_n_binned_events_all(profile, nmax=n)
return all_zs[-1]

[docs]def z_n_gauss(profile, err, n):
'''Z^2_n statistic for normally-distributed profiles

See Bachetti+2021, arXiv:2012.11397

Parameters
----------
profile : array of floats
The folded pulse profile
err : float
The (assumed constant) uncertainty on the flux in each bin.
n : int
Number of harmonics, including the fundamental

Returns
-------
z2_n : float
The value of the statistic
'''
_, all_zs = z_n_gauss_all(profile, err, nmax=n)
return all_zs[-1]

[docs]def z_n_events(phase, n):
'''Z^2_n statistics, a` la Buccheri+83, A&A, 128, 245, eq. 2.

Parameters
----------
phase : array of floats
The phases of the events
n : int, default 2
Number of harmonics, including the fundamental

Returns
-------
z2_n : float
The Z^2_n statistic
'''
ks, all_zs = z_n_events_all(phase, nmax=n)
return all_zs[-1]

[docs]def z_n(data, n, datatype="events", err=None, norm=None):
'''Z^2_n statistics, a` la Buccheri+83, A&A, 128, 245, eq. 2.

If datatype is "binned" or "gauss", uses the formulation from
Bachetti+2021, ApJ, arxiv:2012.11397

Parameters
----------
data : array of floats
Phase values or binned flux values
n : int, default 2
Number of harmonics, including the fundamental

Other Parameters
----------------
datatype : str
The data type: "events" if phase values between 0 and 1,
"binned" if folded pulse profile from photons, "gauss" if
folded pulse profile with normally-distributed fluxes
err : float
The uncertainty on the pulse profile fluxes (required for
datatype="gauss", ignored otherwise)
norm : float
For backwards compatibility; if norm is not None, it is
substituted to ``data``, and data is ignored. This raises
a DeprecationWarning

Returns
-------
z2_n : float
The Z^2_n statistics of the events.
'''
data = np.asarray(data)

if norm is not None:
warnings.warn("The use of ``z_n(phase, norm=profile)`` is deprecated. Use "
DeprecationWarning)
if isinstance(norm, Iterable):
data = norm
datatype = "binned"
else:
datatype = "events"

if data.size == 0:
return 0

if datatype == "binned":
return z_n_binned_events(data, n)
elif datatype == "events":
return z_n_events(data, n)
elif datatype == "gauss":
if err is None:
raise ValueError(
"If datatype='gauss', you need to specify an uncertainty (err)")
return z_n_gauss(data, n=n, err=err)

raise ValueError(f"Unknown datatype requested for Z_n ({datatype})")

[docs]def htest(data, nmax=20, datatype="binned", err=None):
'''htest-test statistic, a` la De Jager+89, A&A, 221, 180D, eq. 2.

If datatype is "binned" or "gauss", uses the formulation from
Bachetti+2021, ApJ, arxiv:2012.11397

Parameters
----------
data : array of floats
Phase values or binned flux values
nmax : int, default 20
Maximum of harmonics for Z^2_n

Other Parameters
----------------
datatype : str
The datatype of data: "events" if phase values between 0 and 1,
"binned" if folded pulse profile from photons, "gauss" if
folded pulse profile with normally-distributed fluxes
err : float
The uncertainty on the pulse profile fluxes (required for
datatype="gauss", ignored otherwise)

Returns
-------
M : int
The best number of harmonics that describe the signal.
htest : float
The htest statistics of the events.
'''
if datatype == "binned":
ks, zs = z_n_binned_events_all(data, nmax)
elif datatype == "events":
ks, zs = z_n_events_all(data, nmax)
elif datatype == "gauss":
if err is None:
raise ValueError(
"If datatype='gauss', you need to specify an uncertainty (err)")
ks, zs = z_n_gauss_all(data, nmax=nmax, err=err)
else:
raise ValueError(f"Unknown datatype requested for htest ({datatype})")

Hs = zs - 4 * ks + 4
bestidx = np.argmax(Hs)

return ks[bestidx], Hs[bestidx]

def fftfit_fun(profile, template, amplitude, phase):
'''Function to be minimized for the FFTFIT method.'''

pass

[docs]def fftfit(prof, template=None, quick=False, sigma=None, use_bootstrap=False,
**fftfit_kwargs):
"""Align a template to a pulse profile.

Parameters
----------
prof : array
The pulse profile
template : array, default None
The template of the pulse used to perform the TOA calculation. If None,
a simple sinusoid is used

Other parameters
----------------
sigma : array
error on profile bins (currently has no effect)
use_bootstrap : bool
Calculate errors using a bootstrap method, with `fftfit_error`
**fftfit_kwargs : additional arguments for `fftfit_error`

Returns
-------
mean_amp, std_amp : floats
Mean and standard deviation of the amplitude
mean_phase, std_phase : floats
Mean and standard deviation of the phase
"""
prof = prof - np.mean(prof)

template = template - np.mean(template)

return taylor_fftfit(prof, template)

def _plot_TOA_fit(profile, template, toa, mod=None, toaerr=None,
"""Plot diagnostic information on the TOA."""
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
import time
phases = np.arange(0, 2, 1 / len(profile))
profile = np.concatenate((profile, profile))
template = np.concatenate((template, template))
if mod is None:
mod = interp1d(phases, template, fill_value='extrapolate')

fig = plt.figure()
plt.plot(phases, profile, drawstyle='steps-mid')
fine_phases = np.linspace(0, 1, 1000)
fine_phases_shifted = fine_phases - toa / period + additional_phase
model = mod(fine_phases_shifted - np.floor(fine_phases_shifted))
model = np.concatenate((model, model))
plt.plot(np.linspace(0, 2, 2000), model)
if toaerr is not None:
plt.axvline((toa - toaerr) / period)
plt.axvline((toa + toaerr) / period)
plt.axvline(toa / period - 0.5 / len(profile), ls='--')
plt.axvline(toa / period + 0.5 / len(profile), ls='--')
timestamp = int(time.time())
plt.savefig('{}.png'.format(timestamp))
if not show:
plt.close(fig)

[docs]def get_TOA(prof, period, tstart, template=None, additional_phase=0,
quick=False, debug=False, use_bootstrap=False,
**fftfit_kwargs):
"""Calculate the Time-Of-Arrival of a pulse.

Parameters
----------
prof : array
The pulse profile
template : array, default None
The template of the pulse used to perform the TOA calculation, if any.
Otherwise use the default of fftfit
tstart : float
The time at the start of the pulse profile

Other parameters
----------------
nstep : int, optional, default 100
Number of steps for the bootstrap method

Returns
-------
toa, toastd : floats
Mean and standard deviation of the TOA
"""
nbin = len(prof)

ph = np.arange(0, 1, 1 / nbin)
if template is None:
template = np.cos(2 * np.pi * ph)

mean_amp, std_amp, phase_res, phase_res_err = \
fftfit(prof, template=template, quick=quick,
use_bootstrap=use_bootstrap, **fftfit_kwargs)
phase_res = phase_res - np.floor(phase_res)

toa = tstart + phase_res * period
toaerr = phase_res_err * period

if debug:
_plot_TOA_fit(prof, template, toa - tstart, toaerr=toaerr,
period=period)

toalist = [None] * len(mjds)
for i, m in enumerate(mjds):
toalist[i] = toa.TOA(m, obs='Barycenter', scale='tdb')

toalist = toa.TOAs(toalist=toalist)
if 'tdb' not in toalist.table.colnames:
toalist.compute_TDBs(ephem=ephem)
if 'ssb_obs_pos' not in toalist.table.colnames:
toalist.compute_posvels(ephem, False)

[docs]def get_orbital_correction_from_ephemeris_file(mjdstart, mjdstop, parfile,
ntimes=1000, ephem="DE405",
return_pint_model=False):
"""Get a correction for orbital motion from pulsar parameter file.

Parameters
----------
mjdstart, mjdstop : float
Start and end of the time interval where we want the orbital solution
parfile : str
Any parameter file understood by PINT (Tempo or Tempo2 format)

Other parameters
----------------
ntimes : int
Number of time intervals to use for interpolation. Default 1000

Returns
-------
correction_sec : function
Function that accepts in input an array of times in seconds and a
floating-point MJDref value, and returns the deorbited times
correction_mjd : function
Function that accepts times in MJDs and returns the deorbited times.
"""
from scipy.interpolate import interp1d
from astropy import units
simon("Assuming events are already referred to the solar system "
"barycenter (timescale is TDB)")
if not HAS_PINT:
raise ImportError("You need the optional dependency PINT to use this "
"functionality: github.com/nanograv/pint")

mjds = np.linspace(mjdstart, mjdstop, ntimes)
m = get_model(parfile)
delays = m.delay(toalist)

correction_mjd_rough = \
interp1d(mjds,
(toalist.table['tdbld'] * units.d - delays).to(units.d).value,
fill_value="extrapolate")

def correction_mjd(mjds):
"""Get the orbital correction.

Parameters
----------
mjds : array-like
The input times in MJD

Returns
-------
mjds: Corrected times in MJD
"""
xvals = correction_mjd_rough.x
# Maybe this will be fixed if scipy/scipy#9602 is accepted
bad = (mjds < xvals[0]) | (np.any(mjds > xvals[-1]))
warnings.warn("Some points are outside the interpolation range:"
return correction_mjd_rough(mjds)

def correction_sec(times, mjdref):
"""Get the orbital correction.

Parameters
----------
times : array-like
The input times in seconds of Mission Elapsed Time (MET)
mjdref : float
MJDREF, reference MJD for the mission

Returns
-------
mets: array-like
Corrected times in MET seconds
"""
deorb_mjds = correction_mjd(times / 86400 + mjdref)
return np.array((deorb_mjds - mjdref) * 86400)

retvals = [correction_sec, correction_mjd]
if return_pint_model:
retvals.append(m)
return retvals
```