# Source code for stingray.powerspectrum

```
import copy
import warnings
from collections.abc import Generator, Iterable
import numpy as np
import scipy
import scipy.optimize
import scipy.stats
import stingray.utils as utils
from stingray.crossspectrum import AveragedCrossspectrum, Crossspectrum
from stingray.gti import bin_intervals_from_gtis, check_gtis
from stingray.largememory import createChunkedSpectra, saveData, HAS_ZARR
from stingray.stats import pds_probability, amplitude_upper_limit
from stingray.utils import genDataPath
from .events import EventList
from .gti import cross_two_gtis
from .lightcurve import Lightcurve
from .fourier import avg_pds_from_iterable, unnormalize_periodograms
from .fourier import avg_pds_from_events
from .fourier import fftfreq, fft
from .fourier import get_flux_iterable_from_segments
from .fourier import rms_calculation, poisson_level
try:
from tqdm import tqdm as show_progress
except ImportError:
def show_progress(a, **kwargs):
return a
__all__ = ["Powerspectrum", "AveragedPowerspectrum", "DynamicalPowerspectrum"]
[docs]
class Powerspectrum(Crossspectrum):
type = "powerspectrum"
"""
Make a :class:`Powerspectrum` (also called periodogram) from a (binned)
light curve. Periodograms can be normalized by either Leahy normalization,
fractional rms normalization, absolute rms normalization, or not at all.
You can also make an empty :class:`Powerspectrum` object to populate with
your own fourier-transformed data (this can sometimes be useful when making
binned power spectra).
Parameters
----------
data: :class:`stingray.Lightcurve` object, optional, default ``None``
The light curve data to be Fourier-transformed.
norm: {"leahy" | "frac" | "abs" | "none" }, optional, default "frac"
The normaliation of the power spectrum to be used. Options are
"leahy", "frac", "abs" and "none", default is "frac".
Other Parameters
----------------
gti: 2-d float array
``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]`` -- Good Time intervals.
This choice overrides the GTIs in the single light curves. Use with
care, especially if these GTIs have overlaps with the input
object GTIs! If you're getting errors regarding your GTIs, don't
use this and only give GTIs to the input object before making
the power spectrum.
skip_checks: bool
Skip initial checks, for speed or other reasons (you need to trust your
inputs!).
Attributes
----------
norm: {"leahy" | "frac" | "abs" | "none" }
The normalization of the power spectrum.
freq: numpy.ndarray
The array of mid-bin frequencies that the Fourier transform samples.
power: numpy.ndarray
The array of normalized squared absolute values of Fourier
amplitudes.
power_err: numpy.ndarray
The uncertainties of ``power``.
An approximation for each bin given by ``power_err= power/sqrt(m)``.
Where ``m`` is the number of power averaged in each bin (by frequency
binning, or averaging power spectra of segments of a light curve).
Note that for a single realization (``m=1``) the error is equal to the
power.
df: float
The frequency resolution.
m: int
The number of averaged powers in each bin.
n: int
The number of data points in the light curve.
nphots: float
The total number of photons in the light curve.
legacy: bool
Use the legacy machinery of ``AveragedPowerspectrum``. This might be
useful to compare with old results, and is also needed to use light
curve lists as an input, to conserve the spectra of each segment, or
to use the large_data option.
"""
def __init__(
self, data=None, norm="frac", gti=None, dt=None, lc=None, skip_checks=False, legacy=False
):
self._type = None
if lc is not None:
warnings.warn(
"The lc keyword is now deprecated. Use data " "instead", DeprecationWarning
)
if data is None:
data = lc
good_input = True
if not skip_checks:
good_input = self.initial_checks(
data1=data, data2=data, norm=norm, gti=gti, lc1=lc, lc2=lc, dt=dt
)
norm = norm.lower()
self.norm = norm
self.dt = dt
if not good_input:
return self._initialize_empty()
if not legacy and data is not None:
return self._initialize_from_any_input(data, dt=dt, norm=norm)
Crossspectrum.__init__(
self, data1=data, data2=data, norm=norm, gti=gti, dt=dt, skip_checks=True, legacy=legacy
)
self.nphots = self.nphots1
self.dt = dt
[docs]
def rebin(self, df=None, f=None, method="mean"):
"""
Rebin the power spectrum.
Parameters
----------
df: float
The new frequency resolution.
Other Parameters
----------------
f: float
The rebin factor. If specified, it substitutes ``df`` with
``f*self.df``, so ``f>1`` is recommended.
Returns
-------
bin_cs = :class:`Powerspectrum` object
The newly binned power spectrum.
"""
bin_ps = Crossspectrum.rebin(self, df=df, f=f, method=method)
bin_ps.nphots = bin_ps.nphots1
return bin_ps
[docs]
def compute_rms(
self, min_freq, max_freq, poisson_noise_level=None, white_noise_offset=None, deadtime=0.0
):
"""
Compute the fractional rms amplitude in the power spectrum
between two frequencies.
Parameters
----------
min_freq: float
The lower frequency bound for the calculation.
max_freq: float
The upper frequency bound for the calculation.
Other parameters
----------------
poisson_noise_level : float, default is None
This is the Poisson noise level of the PDS with same
normalization as the PDS. If poissoin_noise_level is None,
the Poisson noise is calculated in the idealcase
e.g. 2./<countrate> for fractional rms normalisation
Dead time and other instrumental effects can alter it.
The user can fit the Poisson noise level outside
this function using the same normalisation of the PDS
and it will get subtracted from powers here.
white_noise_offset : float, default None
This is the white noise level, in Leahy normalization. In the ideal
case, this is 2. Dead time and other instrumental effects can alter
it. The user can fit the white noise level outside this function
and it will get subtracted from powers here.
Returns
-------
rms: float
The fractional rms amplitude contained between ``min_freq`` and
``max_freq``.
rms_err: float
The error on the fractional rms amplitude.
"""
minind = self.freq.searchsorted(min_freq)
maxind = self.freq.searchsorted(max_freq)
nphots = self.nphots
# distinguish the rebinned and non-rebinned case
if isinstance(self.m, Iterable):
M_freq = self.m[minind:maxind]
K_freq = self.k[minind:maxind]
freq_bins = 1
else:
M_freq = self.m
K_freq = self.k
freq_bins = maxind - minind
T = self.dt * self.n
if white_noise_offset is not None:
powers = self.power[minind:maxind]
warnings.warn(
"the option white_noise_offset now deprecated and will be "
"removed in the next major release. The routine"
"is correct only with non-rebinned power-spectra.",
DeprecationWarning,
)
if self.norm.lower() == "leahy":
powers_leahy = powers.copy()
else:
powers_leahy = self.unnorm_power[minind:maxind].real * 2 / nphots
rms = np.sqrt(np.sum(powers_leahy - white_noise_offset) / nphots)
rms_err = self._rms_error(powers_leahy)
return rms, rms_err
else:
if poisson_noise_level is None:
poisson_noise_unnorm = poisson_level("none", n_ph=self.nphots)
else:
poisson_noise_unnorm = unnormalize_periodograms(
poisson_noise_level, self.dt, self.n, self.nphots, norm=self.norm
)
return rms_calculation(
self.unnorm_power[minind:maxind],
self.freq[minind],
self.freq[maxind],
self.nphots,
T,
M_freq,
K_freq,
freq_bins,
poisson_noise_unnorm,
deadtime,
)
[docs]
def _rms_error(self, powers):
"""
Compute the error on the fractional rms amplitude using error
propagation.
Note: this uses the actual measured powers, which is not
strictly correct. We should be using the underlying power spectrum,
but in the absence of an estimate of that, this will have to do.
.. math::
r = \sqrt{P}
.. math::
\delta r = \\frac{1}{2 * \sqrt{P}} \delta P
Parameters
----------
powers: iterable
The list of powers used to compute the fractional rms amplitude.
Returns
-------
delta_rms: float
The error on the fractional rms amplitude.
"""
nphots = self.nphots
p_err = scipy.stats.chi2(2.0 * self.m).var() * powers / self.m / nphots
rms = np.sum(powers) / nphots
pow = np.sqrt(rms)
drms_dp = 1 / (2 * pow)
sq_sum_err = np.sqrt(np.sum(p_err**2))
delta_rms = sq_sum_err * drms_dp
return delta_rms
[docs]
def classical_significances(self, threshold=1, trial_correction=False):
"""
Compute the classical significances for the powers in the power
spectrum, assuming an underlying noise distribution that follows a
chi-square distributions with 2M degrees of freedom, where M is the
number of powers averaged in each bin.
Note that this function will *only* produce correct results when the
following underlying assumptions are fulfilled:
1. The power spectrum is Leahy-normalized
2. There is no source of variability in the data other than the
periodic signal to be determined with this method. This is
important! If there are other sources of (aperiodic) variability in
the data, this method will *not* produce correct results, but
instead produce a large number of spurious false positive
detections!
3. There are no significant instrumental effects changing the
statistical distribution of the powers (e.g. pile-up or dead time)
By default, the method produces ``(index,p-values)`` for all powers in
the power spectrum, where index is the numerical index of the power in
question. If a ``threshold`` is set, then only powers with p-values
*below* that threshold with their respective indices. If
``trial_correction`` is set to ``True``, then the threshold will be
corrected for the number of trials (frequencies) in the power spectrum
before being used.
Parameters
----------
threshold : float, optional, default ``1``
The threshold to be used when reporting p-values of potentially
significant powers. Must be between 0 and 1.
Default is ``1`` (all p-values will be reported).
trial_correction : bool, optional, default ``False``
A Boolean flag that sets whether the ``threshold`` will be
corrected by the number of frequencies before being applied. This
decreases the ``threshold`` (p-values need to be lower to count as
significant). Default is ``False`` (report all powers) though for
any application where `threshold`` is set to something meaningful,
this should also be applied!
Returns
-------
pvals : iterable
A list of ``(p-value, index)`` tuples for all powers that have
p-values lower than the threshold specified in ``threshold``.
"""
if not self.norm == "leahy":
raise ValueError("This method only works on " "Leahy-normalized power spectra!")
if trial_correction:
ntrial = self.power.shape[0]
else:
ntrial = 1
if np.size(self.m) == 1:
# calculate p-values for all powers
# leave out zeroth power since it just encodes the number of
# photons!
pv = pds_probability(self.power, n_summed_spectra=self.m, ntrial=ntrial)
else:
pv = np.array(
[
pds_probability(power, n_summed_spectra=m, ntrial=ntrial)
for power, m in zip(self.power, self.m)
]
)
# need to add 1 to the indices to make up for the fact that
# we left out the first power above!
indices = np.where(pv < threshold)[0]
pvals = np.vstack([pv[indices], indices])
return pvals
[docs]
def modulation_upper_limit(self, fmin=None, fmax=None, c=0.95):
"""
Upper limit on a sinusoidal modulation.
To understand the meaning of this amplitude: if the modulation is
described by:
..math:: p = \overline{p} (1 + a * \sin(x))
this function returns a.
If it is a sum of sinusoidal harmonics instead
..math:: p = \overline{p} (1 + \sum_l a_l * \sin(lx))
a is equivalent to :math:`\sqrt(\sum_l a_l^2)`.
See `stingray.stats.power_upper_limit`,
`stingray.stats.amplitude_upper_limit`
for more information.
The formula used to calculate the upper limit assumes the Leahy
normalization.
If the periodogram is in another normalization, we will internally
convert it to Leahy before calculating the upper limit.
Parameters
----------
fmin: float
The minimum frequency to search (defaults to the first nonzero bin)
fmax: float
The maximum frequency to search (defaults to the Nyquist frequency)
Other Parameters
----------------
c: float
The confidence value for the upper limit (e.g. 0.95 = 95%)
Returns
-------
a: float
The modulation amplitude that could produce P>pmeas with 1 - c
probability.
Examples
--------
>>> pds = Powerspectrum()
>>> pds.norm = "leahy"
>>> pds.freq = np.arange(0., 5.)
>>> # Note: this pds has 40 as maximum value between 2 and 5 Hz
>>> pds.power = np.array([100000, 1, 1, 40, 1])
>>> pds.m = 1
>>> pds.nphots = 30000
>>> pds.modulation_upper_limit(fmin=2, fmax=5, c=0.99)
0.1016...
"""
pds = self
if self.norm != "leahy":
pds = self.to_norm("leahy")
freq = pds.freq
fnyq = np.max(freq)
power = pds.power
freq_mask = freq > 0
if fmin is not None or fmax is not None:
if fmin is not None:
freq_mask[freq < fmin] = 0
if fmax is not None:
freq_mask[freq > fmax] = 0
freq = freq[freq_mask]
power = power[freq_mask]
maximum_val = np.argmax(power)
nyq_ratio = freq[maximum_val] / fnyq
# I multiply by M because the formulas from Vaughan+94 treat summed
# powers, while here we have averaged powers.
return amplitude_upper_limit(
power[maximum_val] * pds.m, pds.nphots, n=pds.m, c=c, nyq_ratio=nyq_ratio, fft_corr=True
)
[docs]
@staticmethod
def from_time_array(
times, dt, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True
):
"""
Calculate an average power spectrum from an array of event times.
Parameters
----------
times : `np.array`
Event arrival times.
dt : float
The time resolution of the intermediate light curves
(sets the Nyquist frequency).
Other parameters
----------------
segment_size : float
The length, in seconds, of the light curve segments that will be
averaged. Only relevant (and required) for
``AveragedPowerspectrum``.
gti: ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Additional, optional Good Time intervals that get intersected with
the GTIs of the input object. Can cause errors if there are
overlaps between these GTIs and the input object GTIs. If that
happens, assign the desired GTIs to the input object.
norm : str, default "frac"
The normalization of the periodogram. `abs` is absolute rms, `frac`
is fractional rms, `leahy` is Leahy+83 normalization, and `none` is
the unnormalized periodogram.
use_common_mean : bool, default True
The mean of the light curve can be estimated in each interval, or
on the full light curve. This gives different results
(Alston+2013). By default, we assume the mean is calculated on the
full light curve, but the user can set ``use_common_mean`` to False
to calculate it on a per-segment basis.
silent : bool, default False
Silence the progress bars.
"""
return powerspectrum_from_time_array(
times,
dt,
segment_size=segment_size,
gti=gti,
norm=norm,
silent=silent,
use_common_mean=use_common_mean,
)
[docs]
@staticmethod
def from_events(
events, dt, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True
):
"""
Calculate an average power spectrum from an event list.
Parameters
----------
events : `stingray.EventList`
Event list to be analyzed.
dt : float
The time resolution of the intermediate light curves
(sets the Nyquist frequency).
Other parameters
----------------
segment_size : float
The length, in seconds, of the light curve segments that will be
averaged. Only relevant (and required) for
``AveragedPowerspectrum``.
gti: ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Additional, optional Good Time intervals that get intersected with
the GTIs of the input object. Can cause errors if there are
overlaps between these GTIs and the input object GTIs. If that
happens, assign the desired GTIs to the input object.
norm : str, default "frac"
The normalization of the periodogram. `abs` is absolute rms, `frac`
is fractional rms, `leahy` is Leahy+83 normalization, and `none` is
the unnormalized periodogram.
use_common_mean : bool, default True
The mean of the light curve can be estimated in each interval, or
on the full light curve. This gives different results
(Alston+2013). By default, we assume the mean is calculated on the
full light curve, but the user can set ``use_common_mean`` to False
to calculate it on a per-segment basis.
silent : bool, default False
Silence the progress bars.
"""
if gti is None:
gti = events.gti
return powerspectrum_from_events(
events,
dt,
segment_size=segment_size,
gti=gti,
norm=norm,
silent=silent,
use_common_mean=use_common_mean,
)
[docs]
@staticmethod
def from_lightcurve(
lc, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True
):
"""
Calculate a power spectrum from a light curve.
Parameters
----------
events : `stingray.Lightcurve`
Light curve to be analyzed.
dt : float
The time resolution of the intermediate light curves
(sets the Nyquist frequency).
Other parameters
----------------
segment_size : float
The length, in seconds, of the light curve segments that will be
averaged. Only relevant (and required) for
``AveragedPowerspectrum``.
gti: ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Additional, optional Good Time intervals that get intersected with
the GTIs of the input object. Can cause errors if there are
overlaps between these GTIs and the input object GTIs. If that
happens, assign the desired GTIs to the input object.
norm : str, default "frac"
The normalization of the periodogram. `abs` is absolute rms, `frac`
is fractional rms, `leahy` is Leahy+83 normalization, and `none` is
the unnormalized periodogram.
use_common_mean : bool, default True
The mean of the light curve can be estimated in each interval, or
on the full light curve. This gives different results
(Alston+2013). By default, we assume the mean is calculated on the
full light curve, but the user can set ``use_common_mean`` to False
to calculate it on a per-segment basis.
silent : bool, default False
Silence the progress bars.
"""
if gti is None:
gti = lc.gti
return powerspectrum_from_lightcurve(
lc,
segment_size=segment_size,
gti=gti,
norm=norm,
silent=silent,
use_common_mean=use_common_mean,
)
[docs]
@staticmethod
def from_lc_iterable(
iter_lc, dt, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True
):
"""
Calculate the average power spectrum of an iterable collection of
light curves.
Parameters
----------
iter_lc : iterable of `stingray.Lightcurve` objects or `np.array`
Light curves. If arrays, use them as counts.
dt : float
The time resolution of the light curves
(sets the Nyquist frequency)
Other parameters
----------------
segment_size : float
The length, in seconds, of the light curve segments that will be
averaged. Only relevant (and required) for
``AveragedPowerspectrum``.
gti: ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Additional, optional Good Time intervals that get intersected with
the GTIs of the input object. Can cause errors if there are
overlaps between these GTIs and the input object GTIs. If that
happens, assign the desired GTIs to the input object.
norm : str, default "frac"
The normalization of the periodogram. `abs` is absolute rms, `frac`
is fractional rms, `leahy` is Leahy+83 normalization, and `none` is
the unnormalized periodogram.
use_common_mean : bool, default True
The mean of the light curve can be estimated in each interval, or
on the full light curve. This gives different results
(Alston+2013). By default, we assume the mean is calculated on the
full light curve, but the user can set ``use_common_mean`` to False
to calculate it on a per-segment basis.
silent : bool, default False
Silence the progress bars.
"""
return powerspectrum_from_lc_iterable(
iter_lc,
dt,
segment_size=segment_size,
gti=gti,
norm=norm,
silent=silent,
use_common_mean=use_common_mean,
)
[docs]
def _initialize_from_any_input(
self,
data,
dt=None,
segment_size=None,
gti=None,
norm="frac",
silent=False,
use_common_mean=True,
):
"""
Initialize the class, trying to understand the input types.
The input arguments are the same as ``__init__()``. Based on the type
of ``data``, this method will call the appropriate
``powerspectrum_from_XXXX`` function, and initialize ``self`` with
the correct attributes.
"""
if isinstance(data, EventList):
spec = powerspectrum_from_events(
data,
dt,
segment_size,
norm=norm.lower(),
silent=silent,
use_common_mean=use_common_mean,
gti=gti,
)
elif isinstance(data, Lightcurve):
spec = powerspectrum_from_lightcurve(
data,
segment_size,
norm=norm,
silent=silent,
use_common_mean=use_common_mean,
gti=gti,
)
spec.lc1 = data
elif isinstance(data, (tuple, list)):
if not isinstance(data[0], Lightcurve): # pragma: no cover
raise TypeError(f"Bad inputs to Powerspectrum: {type(data[0])}")
dt = data[0].dt
# This is a list of light curves.
spec = powerspectrum_from_lc_iterable(
data,
dt,
segment_size,
norm=norm,
silent=silent,
use_common_mean=use_common_mean,
gti=gti,
)
else: # pragma: no cover
raise TypeError(f"Bad inputs to Powerspectrum: {type(data)}")
for key, val in spec.__dict__.items():
setattr(self, key, val)
return
[docs]
def _initialize_empty(self):
"""Set all attributes to None."""
self.freq = None
self.power = None
self.power_err = None
self.unnorm_power = None
self.unnorm_power_err = None
self.df = None
self.dt = None
self.nphots1 = None
self.m = 1
self.n = None
self.k = 1
return
[docs]
class AveragedPowerspectrum(AveragedCrossspectrum, Powerspectrum):
type = "powerspectrum"
"""
Make an averaged periodogram from a light curve by segmenting the light
curve, Fourier-transforming each segment and then averaging the
resulting periodograms.
Parameters
----------
data: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects OR :class:`stingray.EventList` object
The light curve data to be Fourier-transformed.
segment_size: float
The size of each segment to average. Note that if the total
duration of each :class:`Lightcurve` object in lc is not an integer
multiple of the ``segment_size``, then any fraction left-over at the
end of the time series will be lost.
norm: {"leahy" | "frac" | "abs" | "none" }, optional, default "frac"
The normalization of the periodogram to be used.
Other Parameters
----------------
gti: 2-d float array
``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]`` -- Good Time intervals.
This choice overrides the GTIs in the single light curves. Use with
care, especially if these GTIs have overlaps with the input
object GTIs! If you're getting errors regarding your GTIs, don't
use this and only give GTIs to the input object before making
the power spectrum.
silent : bool, default False
Do not show a progress bar when generating an averaged cross spectrum.
Useful for the batch execution of many spectra.
dt: float
The time resolution of the light curve. Only needed when constructing
light curves in the case where data is of :class:EventList.
large_data : bool, default False
Use only for data larger than 10**7 data points!! Uses zarr and dask
for computation.
save_all : bool, default False
Save all intermediate PDSs used for the final average. Use with care.
This is likely to fill up your RAM on medium-sized datasets, and to
slow down the computation when rebinning.
skip_checks: bool
Skip initial checks, for speed or other reasons (you need to trust your
inputs!).
Attributes
----------
norm: {``leahy`` | ``frac`` | ``abs`` | ``none`` }
The normalization of the periodogram.
freq: numpy.ndarray
The array of mid-bin frequencies that the Fourier transform samples.
power: numpy.ndarray
The array of normalized squared absolute values of Fourier
amplitudes.
power_err: numpy.ndarray
The uncertainties of ``power``.
An approximation for each bin given by ``power_err= power/sqrt(m)``.
Where ``m`` is the number of power averaged in each bin (by frequency
binning, or averaging power spectra of segments of a light curve).
Note that for a single realization (``m=1``) the error is equal to the
power.
df: float
The frequency resolution.
m: int
The number of averaged periodograms.
n: int
The number of data points in the light curve.
nphots: float
The total number of photons in the light curve.
legacy: bool
Use the legacy machinery of ``AveragedPowerspectrum``. This might be
useful to compare with old results, and is also needed to use light
curve lists as an input, to conserve the spectra of each segment, or to
use the large_data option.
"""
def __init__(
self,
data=None,
segment_size=None,
norm="frac",
gti=None,
silent=False,
dt=None,
lc=None,
large_data=False,
save_all=False,
skip_checks=False,
use_common_mean=True,
legacy=False,
):
self._type = None
if lc is not None:
warnings.warn(
"The lc keyword is now deprecated. Use data " "instead", DeprecationWarning
)
# Backwards compatibility: user might have supplied lc instead
if data is None:
data = lc
good_input = True
if not skip_checks:
good_input = self.initial_checks(
data1=data,
data2=data,
norm=norm,
gti=gti,
lc1=lc,
lc2=lc,
dt=dt,
segment_size=segment_size,
)
norm = norm.lower()
self.norm = norm
self.dt = dt
self.save_all = save_all
self.segment_size = segment_size
self.show_progress = not silent
self.k = 1
if not good_input:
return self._initialize_empty()
if isinstance(data, Generator):
warnings.warn(
"The averaged power spectrum from a generator of "
"light curves pre-allocates the full list of light "
"curves, losing all advantage of lazy loading. If it "
"is important for you, use the "
"AveragedPowerspectrum.from_lc_iterable static "
"method, specifying the sampling time `dt`."
)
data = list(data)
# The large_data option requires the legacy interface.
if (large_data or save_all) and not legacy:
warnings.warn(
"The large_data option and the save_all options are"
" only available with the legacy interface"
" (legacy=True)."
)
legacy = True
if not legacy and data is not None:
return self._initialize_from_any_input(
data,
dt=dt,
segment_size=segment_size,
norm=norm,
silent=silent,
use_common_mean=use_common_mean,
)
if large_data and data is not None:
if not HAS_ZARR:
raise ImportError("The large_data option requires zarr.")
chunks = None
if isinstance(data, EventList):
input_data = "EventList"
elif isinstance(data, Lightcurve):
input_data = "Lightcurve"
chunks = int(np.rint(segment_size // data.dt))
segment_size = chunks * data.dt
else:
raise ValueError(f"Invalid input data type: {type(data).__name__}")
dir_path = saveData(data, persist=False, chunks=chunks)
data_path = genDataPath(dir_path)
spec = createChunkedSpectra(
input_data,
"AveragedPowerspectrum",
data_path=data_path,
segment_size=segment_size,
norm=norm,
gti=gti,
power_type=None,
silent=silent,
dt=dt,
)
for key, val in spec.__dict__.items():
setattr(self, key, val)
return
if isinstance(data, EventList):
lengths = data.gti[:, 1] - data.gti[:, 0]
good = lengths >= segment_size
data.gti = data.gti[good]
Powerspectrum.__init__(self, data, norm, gti=gti, dt=dt, skip_checks=True, legacy=legacy)
return
[docs]
def initial_checks(self, *args, **kwargs):
return AveragedCrossspectrum.initial_checks(self, *args, **kwargs)
def _make_segment_spectrum(self, lc, segment_size, silent=False):
"""
Split the light curves into segments of size ``segment_size``, and
calculate a power spectrum for each.
Parameters
----------
lc : :class:`stingray.Lightcurve` objects
The input light curve.
segment_size : ``numpy.float``
Size of each light curve segment to use for averaging.
Other parameters
----------------
silent : bool, default False
Suppress progress bars.
Returns
-------
power_all : list of :class:`Powerspectrum` objects
A list of power spectra calculated independently from each light
curve segment.
nphots_all : ``numpy.ndarray``
List containing the number of photons for all segments calculated
from ``lc``.
"""
if not isinstance(lc, Lightcurve):
raise TypeError("lc must be a Lightcurve object")
current_gtis = lc.gti
if self.gti is None:
self.gti = lc.gti
else:
if not np.allclose(lc.gti, self.gti):
self.gti = np.vstack([self.gti, lc.gti])
check_gtis(self.gti)
start_inds, end_inds = bin_intervals_from_gtis(
current_gtis, segment_size, lc.time, dt=lc.dt
)
power_all = []
nphots_all = []
local_show_progress = show_progress
if not self.show_progress or silent:
def local_show_progress(a):
return a
for start_ind, end_ind in local_show_progress(zip(start_inds, end_inds)):
time = lc.time[start_ind:end_ind]
counts = lc.counts[start_ind:end_ind]
counts_err = lc.counts_err[start_ind:end_ind]
if np.sum(counts) == 0:
warnings.warn("No counts in interval {}--{}s".format(time[0], time[-1]))
continue
lc_seg = Lightcurve(
time,
counts,
err=counts_err,
err_dist=lc.err_dist.lower(),
skip_checks=True,
dt=lc.dt,
)
power_seg = Powerspectrum(lc_seg, norm=self.norm)
power_all.append(power_seg)
nphots_all.append(np.sum(lc_seg.counts))
return power_all, nphots_all
[docs]
class DynamicalPowerspectrum(AveragedPowerspectrum):
type = "powerspectrum"
"""
Create a dynamical power spectrum, also often called a *spectrogram*.
This class will divide a :class:`Lightcurve` object into segments of
length ``segment_size``, create a power spectrum for each segment and store
all powers in a matrix as a function of both time (using the mid-point of
each segment) and frequency.
This is often used to trace changes in period of a (quasi-)periodic signal
over time.
Parameters
----------
lc : :class:`stingray.Lightcurve` or :class:`stingray.EventList` object
The time series or event list of which the dynamical power spectrum is
to be calculated.
segment_size : float, default 1
Length of the segment of light curve, default value is 1 (in whatever
units the ``time`` array in the :class:`Lightcurve`` object uses).
norm: {"leahy" | "frac" | "abs" | "none" }, optional, default "frac"
The normaliation of the periodogram to be used.
Other Parameters
----------------
gti: 2-d float array
``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]`` -- Good Time intervals.
This choice overrides the GTIs in the single light curves. Use with
care, especially if these GTIs have overlaps with the input
object GTIs! If you're getting errors regarding your GTIs, don't
use this and only give GTIs to the input object before making
the power spectrum.
Attributes
----------
segment_size: float
The size of each segment to average. Note that if the total
duration of each input object in lc is not an integer multiple
of the ``segment_size``, then any fraction left-over at the end of the
time series will be lost.
dyn_ps : np.ndarray
The matrix of normalized squared absolute values of Fourier
amplitudes. The axis are given by the ``freq``
and ``time`` attributes.
norm: {``leahy`` | ``frac`` | ``abs`` | ``none``}
The normalization of the periodogram.
freq: numpy.ndarray
The array of mid-bin frequencies that the Fourier transform samples.
df: float
The frequency resolution.
dt: float
The time resolution.
"""
def __init__(self, lc, segment_size, norm="frac", gti=None, dt=None):
if isinstance(lc, EventList) and dt is None:
raise ValueError("To pass an input event lists, please specify dt")
if isinstance(lc, EventList):
lc = lc.to_lc(dt)
if segment_size < 2 * lc.dt:
raise ValueError("Length of the segment is too short to form a " "light curve!")
elif segment_size > lc.tseg:
raise ValueError(
"Length of the segment is too long to create " "any segments of the light curve!"
)
AveragedPowerspectrum.__init__(
self, data=lc, segment_size=segment_size, norm=norm, gti=gti, dt=dt
)
self._make_matrix(lc)
def _make_matrix(self, lc):
"""
Create a matrix of powers for each time step and each frequency step.
Time increases with row index, frequency with column index.
Parameters
----------
lc : :class:`Lightcurve` object
The :class:`Lightcurve` object from which to generate the dynamical
power spectrum.
"""
ps_all, _ = AveragedPowerspectrum._make_segment_spectrum(self, lc, self.segment_size)
self.dyn_ps = np.array([ps.power for ps in ps_all]).T
self.freq = ps_all[0].freq
current_gti = lc.gti
if self.gti is not None:
current_gti = cross_two_gtis(self.gti, current_gti)
start_inds, end_inds = bin_intervals_from_gtis(
current_gti, self.segment_size, lc.time, dt=lc.dt
)
tstart = lc.time[start_inds]
tend = lc.time[end_inds]
self.time = tstart + 0.5 * (tend - tstart)
# Assign length of lightcurve as time resolution if only one value
if len(self.time) > 1:
self.dt = self.time[1] - self.time[0]
else:
self.dt = lc.n
# Assign biggest freq. resolution if only one value
if len(self.freq) > 1:
self.df = self.freq[1] - self.freq[0]
else:
self.df = 1 / lc.n
[docs]
def rebin_frequency(self, df_new, method="sum"):
"""
Rebin the Dynamic Power Spectrum to a new frequency resolution.
Rebinning is an in-place operation, i.e. will replace the existing
``dyn_ps`` attribute.
While the new resolution need not be an integer multiple of the
previous frequency resolution, be aware that if it is not, the last
bin will be cut off by the fraction left over by the integer division.
Parameters
----------
df_new: float
The new frequency resolution of the dynamical power spectrum.
Must be larger than the frequency resolution of the old dynamical
power spectrum!
method: {"sum" | "mean" | "average"}, optional, default "sum"
This keyword argument sets whether the counts in the new bins
should be summed or averaged.
"""
new_dynspec_object = copy.deepcopy(self)
dynspec_new = []
for data in self.dyn_ps.T:
freq_new, bin_counts, bin_err, _ = utils.rebin_data(
self.freq, data, dx_new=df_new, method=method
)
dynspec_new.append(bin_counts)
new_dynspec_object.freq = freq_new
new_dynspec_object.dyn_ps = np.array(dynspec_new).T
new_dynspec_object.df = df_new
return new_dynspec_object
[docs]
def trace_maximum(self, min_freq=None, max_freq=None):
"""
Return the indices of the maximum powers in each segment
:class:`Powerspectrum` between specified frequencies.
Parameters
----------
min_freq: float, default ``None``
The lower frequency bound.
max_freq: float, default ``None``
The upper frequency bound.
Returns
-------
max_positions : np.array
The array of indices of the maximum power in each segment having
frequency between ``min_freq`` and ``max_freq``.
"""
if min_freq is None:
min_freq = np.min(self.freq)
if max_freq is None:
max_freq = np.max(self.freq)
max_positions = []
for ps in self.dyn_ps.T:
indices = np.logical_and(self.freq <= max_freq, min_freq <= self.freq)
max_power = np.max(ps[indices])
max_positions.append(np.where(ps == max_power)[0][0])
return np.array(max_positions)
[docs]
def rebin_time(self, dt_new, method="sum"):
"""
Rebin the Dynamic Power Spectrum to a new time resolution.
While the new resolution need not be an integer multiple of the
previous time resolution, be aware that if it is not, the last bin
will be cut off by the fraction left over by the integer division.
Parameters
----------
dt_new: float
The new time resolution of the dynamical power spectrum.
Must be larger than the time resolution of the old dynamical power
spectrum!
method: {"sum" | "mean" | "average"}, optional, default "sum"
This keyword argument sets whether the counts in the new bins
should be summed or averaged.
Returns
-------
time_new: numpy.ndarray
Time axis with new rebinned time resolution.
dynspec_new: numpy.ndarray
New rebinned Dynamical Power Spectrum.
"""
if dt_new < self.dt:
raise ValueError("New time resolution must be larger than " "old time resolution!")
new_dynspec_object = copy.deepcopy(self)
dynspec_new = []
for data in self.dyn_ps:
time_new, bin_counts, bin_err, _ = utils.rebin_data(
self.time, data, dt_new, method=method
)
dynspec_new.append(bin_counts)
new_dynspec_object.time = time_new
new_dynspec_object.dyn_ps = np.array(dynspec_new)
new_dynspec_object.dt = dt_new
return new_dynspec_object
def powerspectrum_from_time_array(
times, dt, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True
):
"""
Calculate a power spectrum from an array of event times.
Parameters
----------
times : `np.array`
Event arrival times.
dt : float
The time resolution of the intermediate light curves
(sets the Nyquist frequency).
Other parameters
----------------
segment_size : float
The length, in seconds, of the light curve segments that will be
averaged. Only required (and used) for ``AveragedPowerspectrum``.
gti : ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Additional, optional Good Time intervals that get intersected with
the GTIs of the input object. Can cause errors if there are
overlaps between these GTIs and the input object GTIs. If that
happens, assign the desired GTIs to the input object.
norm : str, default "frac"
The normalization of the periodogram. `abs` is absolute rms, `frac`
is fractional rms, `leahy` is Leahy+83 normalization, and `none` is
the unnormalized periodogram.
use_common_mean : bool, default True
The mean of the light curve can be estimated in each interval, or
on the full light curve. This gives different results
(Alston+2013). By default, we assume the mean is calculated on the
full light curve, but the user can set ``use_common_mean`` to False
to calculate it on a per-segment basis.
silent : bool, default False
Silence the progress bars.
Returns
-------
spec : `AveragedPowerspectrum` or `Powerspectrum`
The output periodogram.
"""
force_averaged = segment_size is not None
# Suppress progress bar for single periodogram
silent = silent or (segment_size is None)
table = avg_pds_from_events(
times, gti, segment_size, dt, norm=norm, use_common_mean=use_common_mean, silent=silent
)
return _create_powerspectrum_from_result_table(table, force_averaged=force_averaged)
def powerspectrum_from_events(
events, dt, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True
):
"""
Calculate a power spectrum from an event list.
Parameters
----------
events : `stingray.EventList`
Event list to be analyzed.
dt : float
The time resolution of the intermediate light curves
(sets the Nyquist frequency)
Other parameters
----------------
segment_size : float
The length, in seconds, of the light curve segments that will be
averaged. Only required (and used) for ``AveragedPowerspectrum``.
gti : ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Additional, optional Good Time intervals that get intersected with
the GTIs of the input object. Can cause errors if there are
overlaps between these GTIs and the input object GTIs. If that
happens, assign the desired GTIs to the input object.
norm : str, default "frac"
The normalization of the periodogram. `abs` is absolute rms, `frac`
is fractional rms, `leahy` is Leahy+83 normalization, and `none` is
the unnormalized periodogram.
use_common_mean : bool, default True
The mean of the light curve can be estimated in each interval, or
on the full light curve. This gives different results
(Alston+2013). By default, we assume the mean is calculated on the
full light curve, but the user can set ``use_common_mean`` to False
to calculate it on a per-segment basis.
silent : bool, default False
Silence the progress bars.
Returns
-------
spec : `AveragedPowerspectrum` or `Powerspectrum`
The output periodogram.
"""
if gti is None:
gti = events.gti
return powerspectrum_from_time_array(
events.time,
dt,
segment_size,
gti,
norm=norm,
silent=silent,
use_common_mean=use_common_mean,
)
def powerspectrum_from_lightcurve(
lc, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True
):
"""
Calculate a power spectrum from a light curve
Parameters
----------
events : `stingray.Lightcurve`
Light curve to be analyzed.
dt : float
The time resolution of the intermediate light curves
(sets the Nyquist frequency)
Other parameters
----------------
segment_size : float
The length, in seconds, of the light curve segments that will be
averaged. Only required (and used) for ``AveragedPowerspectrum``.
gti : ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Additional, optional Good Time intervals that get intersected with
the GTIs of the input object. Can cause errors if there are
overlaps between these GTIs and the input object GTIs. If that
happens, assign the desired GTIs to the input object.
norm : str, default "frac"
The normalization of the periodogram. `abs` is absolute rms, `frac`
is fractional rms, `leahy` is Leahy+83 normalization, and `none` is
the unnormalized periodogram.
use_common_mean : bool, default True
The mean of the light curve can be estimated in each interval, or
on the full light curve. This gives different results
(Alston+2013). By default, we assume the mean is calculated on the
full light curve, but the user can set ``use_common_mean`` to False
to calculate it on a per-segment basis.
silent : bool, default False
Silence the progress bars.
Returns
-------
spec : `AveragedPowerspectrum` or `Powerspectrum`
The output periodogram.
"""
force_averaged = segment_size is not None
# Suppress progress bar for single periodogram
silent = silent or (segment_size is None)
err = None
if lc.err_dist == "gauss":
err = lc.counts_err
if gti is None:
gti = lc.gti
table = avg_pds_from_events(
lc.time,
gti,
segment_size,
lc.dt,
norm=norm,
use_common_mean=use_common_mean,
silent=silent,
fluxes=lc.counts,
errors=err,
)
return _create_powerspectrum_from_result_table(table, force_averaged=force_averaged)
def powerspectrum_from_lc_iterable(
iter_lc, dt, segment_size=None, gti=None, norm="frac", silent=False, use_common_mean=True
):
"""
Calculate an average power spectrum from an iterable collection of light
curves.
Parameters
----------
iter_lc : iterable of `stingray.Lightcurve` objects or `np.array`
Light curves. If arrays, use them as counts.
dt : float
The time resolution of the light curves
(sets the Nyquist frequency).
Other parameters
----------------
segment_size : float, default None
The length, in seconds, of the light curve segments that will be
averaged. If not ``None``, it will be used to check the segment size of
the output.
gti : ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Additional, optional Good Time intervals that get intersected with
the GTIs of the input object. Can cause errors if there are
overlaps between these GTIs and the input object GTIs. If that
happens, assign the desired GTIs to the input object.
norm : str, default "frac"
The normalization of the periodogram. `abs` is absolute rms, `frac`
is fractional rms, `leahy` is Leahy+83 normalization, and `none` is
the unnormalized periodogram.
use_common_mean : bool, default True
The mean of the light curve can be estimated in each interval, or
on the full light curve. This gives different results
(Alston+2013). By default, we assume the mean is calculated on the
full light curve, but the user can set ``use_common_mean`` to False
to calculate it on a per-segment basis.
silent : bool, default False
Silence the progress bars.
Returns
-------
spec : `AveragedPowerspectrum` or `Powerspectrum`
The output periodogram.
"""
force_averaged = segment_size is not None
# Suppress progress bar for single periodogram
silent = silent or (segment_size is None)
common_gti = gti
def iterate_lc_counts(iter_lc):
for lc in iter_lc:
if hasattr(lc, "counts"):
n_bin = np.rint(segment_size / lc.dt).astype(int)
gti = lc.gti
if common_gti is not None:
gti = cross_two_gtis(common_gti, lc.gti)
err = None
if lc.err_dist == "gauss":
err = lc.counts_err
flux_iterable = get_flux_iterable_from_segments(
lc.time, gti, segment_size, n_bin, fluxes=lc.counts, errors=err
)
for out in flux_iterable:
yield out
elif isinstance(lc, Iterable):
yield lc
else:
raise TypeError(
"The inputs to `powerspectrum_from_lc_iterable`"
" must be Lightcurve objects or arrays"
)
table = avg_pds_from_iterable(
iterate_lc_counts(iter_lc), dt, norm=norm, use_common_mean=use_common_mean, silent=silent
)
return _create_powerspectrum_from_result_table(table, force_averaged=force_averaged)
def _create_powerspectrum_from_result_table(table, force_averaged=False):
"""
Copy the columns and metadata from the results of
``stingray.fourier.avg_pds_from_XX`` functions into
`AveragedPowerspectrum` or `Powerspectrum` objects.
By default, allocates a Powerspectrum object if the number of
averaged spectra is 1, otherwise an AveragedPowerspectrum.
If the user specifies ``force_averaged=True``, it always allocates
an AveragedPowerspectrum.
Parameters
----------
table : `astropy.table.Table`
results of `avg_cs_from_iterables` or `avg_cs_from_iterables_quick`
Other parameters
----------------
force_averaged : bool, default False
Returns
-------
spec : `AveragedPowerspectrum` or `Powerspectrum`
The output periodogram.
"""
if table.meta["m"] > 1 or force_averaged:
cs = AveragedPowerspectrum()
else:
cs = Powerspectrum()
cs.freq = np.array(table["freq"])
cs.power = np.array(table["power"])
cs.unnorm_power = np.array(table["unnorm_power"])
for attr, val in table.meta.items():
setattr(cs, attr, val)
cs.err_dist = "poisson"
if hasattr(cs, "variance") and cs.variance is not None:
cs.err_dist = "gauss"
cs.power_err = cs.power / np.sqrt(cs.m)
cs.unnorm_power_err = cs.unnorm_power / np.sqrt(cs.m)
cs.nphots1 = cs.nphots
return cs
```