Source code for stingray.fourier

import copy
import warnings
from collections.abc import Iterable
from typing import Optional

import numpy as np
import numpy.typing as npt
from astropy.table import Table
from astropy.timeseries.periodograms.lombscargle.implementations.utils import trig_sum

from .gti import (
    generate_indices_of_segment_boundaries_binned,
    generate_indices_of_segment_boundaries_unbinned,
)

from .utils import (
    fft,
    fftfreq,
    histogram,
    show_progress,
    sum_if_not_none_or_initialize,
    fix_segment_size_to_integer_samples,
    rebin_data,
    njit,
    vectorize,
)


__all__ = [
    "integrate_power_in_frequency_range",
    "positive_fft_bins",
    "poisson_level",
    "normalize_frac",
    "normalize_abs",
    "normalize_leahy_from_variance",
    "normalize_leahy_poisson",
    "normalize_periodograms",
    "unnormalize_periodograms",
    "bias_term",
    "raw_coherence",
    "intrinsic_coherence",
]


[docs] def integrate_power_in_frequency_range( frequency, power, frequency_range, power_err=None, df=None, m=1, poisson_power=0, ): """ Integrate the power in a given frequency range. Parameters ---------- frequency : iterable The frequencies of the power spectrum power : iterable The power at each frequency frequency_range : iterable of length 2 The frequency range to integrate power_err : iterable, optional, default None The power error bar at each frequency df : float or float iterable, optional, default None The frequency resolution of the input data. If None, it is calculated from the median difference of input frequencies. m : int, optional, default 1 The number of segments and/or contiguous frequency bins averaged to obtain power. Only needed if ``power_err`` is None poisson_power : float, optional, default 0 The Poisson noise level of the power spectrum. Returns ------- power_integrated : float The integrated power power_integrated_err : float The error on the integrated power """ frequency = np.array(frequency, dtype=float) power = np.array(power) if not np.iscomplexobj(power): power = power.astype(float) if not isinstance(poisson_power, Iterable): poisson_power = np.ones_like(frequency) * poisson_power if df is None: df = np.median(np.diff(frequency)) if not isinstance(df, Iterable): df = np.ones_like(frequency) * df # The frequency_range is in the middle or at the edge of each bin. When only a part of the # bin is included, we need to add the fraction of the bin that is included. frequency_mask = (frequency + df / 2 > frequency_range[0]) & ( frequency - df / 2 < frequency_range[1] ) freqs_to_integrate = frequency[frequency_mask] poisson_power = poisson_power[frequency_mask] correction_ratios = np.ones_like(freqs_to_integrate) dfs_to_integrate = df[frequency_mask] # The first and last bins are only partially included. We need to correct for the # fraction of the bin actually included. correction_ratios[0] = ( freqs_to_integrate[0] + 0.5 * dfs_to_integrate[0] - frequency_range[0] ) / dfs_to_integrate[0] correction_ratios[-1] = ( frequency_range[-1] - freqs_to_integrate[-1] + 0.5 * dfs_to_integrate[-1] ) / dfs_to_integrate[-1] dfs_to_integrate = dfs_to_integrate * correction_ratios powers_to_integrate = power[frequency_mask] if power_err is None: power_err_to_integrate = powers_to_integrate / np.sqrt(m) else: power_err_to_integrate = np.asanyarray(power_err)[frequency_mask] power_integrated = np.sum((powers_to_integrate - poisson_power) * dfs_to_integrate) power_err_integrated = np.sqrt(np.sum((power_err_to_integrate * dfs_to_integrate) ** 2)) return power_integrated, power_err_integrated
[docs] def positive_fft_bins(n_bin, include_zero=False): """ Give the range of positive frequencies of a complex FFT. This assumes we are using Numpy's FFT, or something compatible with it, like ``pyfftw.interfaces.numpy_fft``, where the positive frequencies come before the negative ones, the Nyquist frequency is included in the negative frequencies but only in even number of bins, and so on. This is mostly to avoid using the ``freq > 0`` mask, which is memory-hungry and inefficient with large arrays. We use instead a slice object, giving the range of bins of the positive frequencies. See https://numpy.org/doc/stable/reference/routines.fft.html#implementation-details Parameters ---------- n_bin : int The number of bins in the FFT, including all frequencies Other Parameters ---------------- include_zero : bool, default False Include the zero frequency in the output slice Returns ------- positive_bins : `slice` Slice object encoding the positive frequency bins. See examples. Examples -------- Let us calculate the positive frequencies using the usual mask >>> freq = np.fft.fftfreq(10) >>> good = freq > 0 This works well, but it is highly inefficient in large arrays. This function will instead return a `slice object`, which will work as an equivalent mask for the positive bins. Below, a few tests that this works as expected. >>> goodbins = positive_fft_bins(10) >>> assert np.allclose(freq[good], freq[goodbins]) >>> freq = np.fft.fftfreq(11) >>> good = freq > 0 >>> goodbins = positive_fft_bins(11) >>> assert np.allclose(freq[good], freq[goodbins]) >>> freq = np.fft.fftfreq(10) >>> good = freq >= 0 >>> goodbins = positive_fft_bins(10, include_zero=True) >>> assert np.allclose(freq[good], freq[goodbins]) >>> freq = np.fft.fftfreq(11) >>> good = freq >= 0 >>> goodbins = positive_fft_bins(11, include_zero=True) >>> assert np.allclose(freq[good], freq[goodbins]) """ # The zeroth bin is 0 Hz. We usually don't include it, but # if the user wants it, we do. minbin = 1 if include_zero: minbin = 0 if n_bin % 2 == 0: return slice(minbin, n_bin // 2) return slice(minbin, (n_bin + 1) // 2)
[docs] def poisson_level(norm="frac", meanrate=None, n_ph=None, backrate=0): r""" Poisson (white)-noise level in a periodogram of pure counting noise. For Leahy normalization, this is: .. math:: P = 2 For the fractional r.m.s. normalization, this is .. math:: P = \frac{2}{\mu} where :math:`\mu` is the average count rate For the absolute r.m.s. normalization, this is .. math:: P = 2 \mu Finally, for the unnormalized periodogram, this is .. math:: P = N_{\rm ph} Parameters ---------- norm : str, default "frac" Normalization of the periodogram. One of ["abs", "frac", "leahy", "none"]. Other Parameters ---------------- meanrate : float, default None Mean count rate in counts/s. Needed for r.m.s. norms ("abs" and "frac"). n_ph : float, default None Total number of counts in the light curve. Needed if ``norm=="none"``. backrate : float, default 0 Background count rate in counts/s. Optional for fractional r.m.s. norm. Raises ------ ValueError If the inputs are incompatible with the required normalization. Returns ------- power_noise : float The Poisson noise level in the wanted normalization. Examples -------- >>> poisson_level(norm="leahy") 2.0 >>> poisson_level(norm="abs", meanrate=10.) 20.0 >>> poisson_level(norm="frac", meanrate=10.) 0.2 >>> poisson_level(norm="none", n_ph=10) 10.0 >>> poisson_level(norm="asdfwrqfasdh3r", meanrate=10.) Traceback (most recent call last): ... ValueError: Unknown value for norm: asdfwrqfasdh3r... >>> poisson_level(norm="none", meanrate=10) Traceback (most recent call last): ... ValueError: Bad input parameters for norm none... >>> poisson_level(norm="abs", n_ph=10) Traceback (most recent call last): ... ValueError: Bad input parameters for norm abs... """ # Various ways the parameters are wrong. # We want the noise in rms norm, but don't specify the mean rate. bad_input = norm.lower() in ["abs", "frac"] and meanrate is None # We want the noise in unnormalized powers, without giving n_ph. bad_input = bad_input or (norm.lower() == "none" and n_ph is None) if bad_input: raise ValueError( f"Bad input parameters for norm {norm}: n_ph={n_ph}, " f"meanrate={meanrate}" ) if norm == "abs": return 2.0 * meanrate if norm == "frac": return 2.0 / (meanrate - backrate) ** 2 * meanrate if norm == "leahy": return 2.0 if norm == "none": return float(n_ph) raise ValueError(f"Unknown value for norm: {norm}")
[docs] def normalize_frac(unnorm_power, dt, n_bin, mean_flux, background_flux=0): r""" Fractional rms normalization. .. math:: P = \frac{P_{\rm Leahy}}{\mu} = \frac{2T}{N_{\rm ph}^2}P_{\rm unnorm} where :math:`\mu` is the mean count rate, :math:`T` is the length of the observation, and :math:`N_{\rm ph}` the number of photons. Alternative formulas found in the literature substitute :math:`T=N\,dt`, :math:`\mu=N_{\rm ph}/T`, which give equivalent results. If the background can be estimated, one can calculate the source rms normalized periodogram as .. math:: P = P_{\rm Leahy} \frac{\mu}{(\mu - \beta)^2} or .. math:: P = \frac{2T}{(N_{\rm ph} - \beta T)^2}P_{\rm unnorm} where :math:`\beta` is the background count rate. This is also called the Belloni or Miyamoto normalization. In this normalization, the periodogram is in units of :math:`(rms/mean)^2 Hz^{-1}`, and the squared root of the integrated periodogram will give the fractional rms in the required frequency range. Belloni & Hasinger (1990) A&A 230, 103 Miyamoto et al. (1991), ApJ 383, 784 Parameters ---------- unnorm_power : `np.array` of `float` or `complex` The unnormalized (cross-)spectral powers dt : float The sampling time n_bin : int The number of bins in the light curve mean_flux : float The mean of the light curve used to calculate the periodogram. If the light curve is in counts, it gives the mean counts per bin. Other parameters ---------------- background_flux : float, default 0 The background flux, in the same units as `mean_flux`. Returns ------- power : `np.array` of the same kind and shape as `unnorm_power` The normalized powers. Examples -------- >>> mean = var = 1000000 >>> back = 100000 >>> n_bin = 1000000 >>> dt = 0.2 >>> meanrate = mean / dt >>> backrate = back / dt >>> lc = np.random.poisson(mean, n_bin) >>> pds = np.abs(fft(lc))**2 >>> pdsnorm = normalize_frac(pds, dt, lc.size, mean) >>> assert np.isclose( ... pdsnorm[1:n_bin//2].mean(), poisson_level(meanrate=meanrate,norm="frac"), rtol=0.01) >>> pdsnorm = normalize_frac(pds, dt, lc.size, mean, background_flux=back) >>> assert np.isclose(pdsnorm[1:n_bin//2].mean(), ... poisson_level(meanrate=meanrate,norm="frac",backrate=backrate), ... rtol=0.01) """ # (mean * n_bin) / (mean /dt) = n_bin * dt # It's Leahy / meanrate; # n_ph = mean * n_bin # meanrate = mean / dt # norm = 2 / (n_ph * meanrate) = 2 * dt / (mean**2 * n_bin) if background_flux > 0: power = unnorm_power * 2.0 * dt / ((mean_flux - background_flux) ** 2 * n_bin) else: # Note: this corresponds to eq. 3 in Uttley+14 power = unnorm_power * 2.0 * dt / (mean_flux**2 * n_bin) return power
[docs] def normalize_abs(unnorm_power, dt, n_bin): r""" Absolute rms normalization. .. math:: P = P_{\rm frac} * \mu^2 where :math:`\mu` is the mean count rate, or equivalently .. math:: P = \frac{2}{T}P_{\rm unnorm} In this normalization, the periodogram is in units of :math:`rms^2 Hz^{-1}`, and the squared root of the integrated periodogram will give the absolute rms in the required frequency range. e.g. Uttley & McHardy, MNRAS 323, L26 Parameters ---------- unnorm_power : `np.array` of `float` or `complex` The unnormalized (cross-)spectral powers dt : float The sampling time n_bin : int The number of bins in the light curve Returns ------- power : `np.array` of the same kind and shape as `unnorm_power` The normalized powers. Examples -------- >>> mean = var = 100000 >>> n_bin = 1000000 >>> dt = 0.2 >>> meanrate = mean / dt >>> lc = np.random.poisson(mean, n_bin) >>> pds = np.abs(fft(lc))**2 >>> pdsnorm = normalize_abs(pds, dt, lc.size) >>> assert np.isclose( ... pdsnorm[1:n_bin//2].mean(), poisson_level(norm="abs", meanrate=meanrate), rtol=0.01) """ # It's frac * meanrate**2; Leahy / meanrate * meanrate**2 # n_ph = mean * n_bin # meanrate = mean / dt # norm = 2 / (n_ph * meanrate) * meanrate**2 = 2 * dt / (mean**2 * n_bin) * mean**2 / dt**2 return unnorm_power * 2.0 / n_bin / dt
[docs] def normalize_leahy_from_variance(unnorm_power, variance, n_bin): r""" Leahy+83 normalization, from the variance of the lc. .. math:: P = \frac{P_{\rm unnorm}}{N <\delta{x}^2>} In this normalization, the periodogram of a single light curve is distributed according to a chi squared distribution with two degrees of freedom. In this version, the normalization is obtained by the variance of the light curve bins, instead of the more usual version with the number of photons. This allows to obtain this normalization also in the case of non-Poisson distributed data. Parameters ---------- unnorm_power : `np.array` of `float` or `complex` The unnormalized (cross-)spectral powers variance : float The mean variance of the light curve bins n_bin : int The number of bins in the light curve Returns ------- power : `np.array` of the same kind and shape as `unnorm_power` The normalized powers. Examples -------- >>> mean = var = 100000. >>> n_bin = 1000000 >>> lc = np.random.poisson(mean, n_bin).astype(float) >>> pds = np.abs(fft(lc))**2 >>> pdsnorm = normalize_leahy_from_variance(pds, var, lc.size) >>> assert np.isclose(pdsnorm[0], 2 * np.sum(lc), rtol=0.01) >>> assert np.isclose(pdsnorm[1:n_bin//2].mean(), poisson_level(norm="leahy"), rtol=0.01) If the variance is zero, it will fail: >>> pdsnorm = normalize_leahy_from_variance(pds, 0., lc.size) Traceback (most recent call last): ... ValueError: The variance used to normalize the ... """ if variance == 0.0: raise ValueError("The variance used to normalize the periodogram is 0.") return unnorm_power * 2.0 / (variance * n_bin)
[docs] def normalize_leahy_poisson(unnorm_power, n_ph): r""" Leahy+83 normalization. .. math:: P = \frac{2}{N_{\rm ph}} P_{\rm unnorm} In this normalization, the periodogram of a single light curve is distributed according to a chi squared distribution with two degrees of freedom. Leahy et al. 1983, ApJ 266, 160 Parameters ---------- unnorm_power : `np.array` of `float` or `complex` The unnormalized (cross-)spectral powers variance : float The mean variance of the light curve bins n_bin : int The number of bins in the light curve Returns ------- power : `np.array` of the same kind and shape as `unnorm_power` The normalized powers. Examples -------- >>> mean = var = 100000. >>> n_bin = 1000000 >>> lc = np.random.poisson(mean, n_bin).astype(float) >>> pds = np.abs(fft(lc))**2 >>> pdsnorm = normalize_leahy_poisson(pds, np.sum(lc)) >>> assert np.isclose(pdsnorm[0], 2 * np.sum(lc), rtol=0.01) >>> assert np.isclose(pdsnorm[1:n_bin//2].mean(), poisson_level(norm="leahy"), rtol=0.01) """ return unnorm_power * 2.0 / n_ph
[docs] def normalize_periodograms( unnorm_power, dt, n_bin, mean_flux=None, n_ph=None, variance=None, background_flux=0.0, norm="frac", power_type="all", ): """ Wrapper around all the normalize_NORM methods. Normalize the cross-spectrum or the power-spectrum to Leahy, absolute rms^2, fractional rms^2 normalization, or not at all. Parameters ---------- unnorm_power : numpy.ndarray The unnormalized cross spectrum. dt : float The sampling time of the light curve n_bin : int The number of bins in the light curve Other parameters ---------------- mean_flux : float The mean of the light curve used to calculate the powers (If a cross spectrum, the geometrical mean of the light curves in the two channels). Only relevant for "frac" normalization n_ph : int or float The number of counts in the light curve used to calculate the unnormalized periodogram. Only relevant for Leahy normalization. variance : float The average variance of the measurements in light curve (if a cross spectrum, the geometrical mean of the variances in the two channels). **NOT** the variance of the light curve, but of each flux measurement (square of light curve error bar)! Only relevant for the Leahy normalization of non-Poissonian data. norm : str One of ``leahy`` (Leahy+83), ``frac`` (fractional rms), ``abs`` (absolute rms), power_type : str One of ``real`` (real part), ``all`` (all complex powers), ``abs`` (absolute value) background_flux : float, default 0 The background flux, in the same units as `mean_flux`. Returns ------- power : numpy.nd.array The normalized co-spectrum (real part of the cross spectrum). For 'none' normalization, imaginary part is returned as well. """ if norm == "leahy" and variance is not None: pds = normalize_leahy_from_variance(unnorm_power, variance, n_bin) elif norm == "leahy": pds = normalize_leahy_poisson(unnorm_power, n_ph) elif norm == "frac": pds = normalize_frac(unnorm_power, dt, n_bin, mean_flux, background_flux=background_flux) elif norm == "abs": pds = normalize_abs(unnorm_power, dt, n_bin) elif norm == "none": pds = unnorm_power else: raise ValueError("Unknown value for the norm") if power_type == "all": return pds if power_type == "real": return pds.real if power_type in ["abs", "absolute"]: return np.abs(pds) raise ValueError("Unrecognized power type")
[docs] def unnormalize_periodograms( norm_power, dt, n_bin, n_ph, variance=None, background_flux=0.0, norm=None, power_type="all" ): """ Wrapper around all the normalize_NORM methods. Unnormalize the power of the cross-spectrum to Leahy, absolute rms^2, fractional rms^2 normalization, or not at all. Parameters ---------- norm_power : numpy.ndarray The normalized cross-spectrum or poisson noise dt : float The sampling time of the light curve n_bin : int The number of bins in the light curve Other parameters ---------------- mean_flux : float The mean of the light curve used to calculate the powers (If a cross spectrum, the geometrical mean of the light curves in the two channels). Only relevant for "frac" normalization n_ph : int or float The number of counts in the light curve used to calculate the unnormalized periodogram. Only relevant for Leahy normalization. variance : float The average variance of the measurements in light curve (if a cross spectrum, the geometrical mean of the variances in the two channels). **NOT** the variance of the light curve, but of each flux measurement (square of light curve error bar)! Only relevant for the Leahy normalization of non-Poissonian data. norm : str One of ``leahy`` (Leahy+83), ``frac`` (fractional rms), ``abs`` (absolute rms), power_type : str One of ``real`` (real part), ``all`` (all complex powers), ``abs`` (absolute value) background_flux : float, default 0 The background flux, in the same units as `mean_flux`. Returns ------- power : numpy.nd.array The normalized co-spectrum (real part of the cross spectrum). For 'none' normalization, imaginary part is returned as well. """ if norm == "leahy" and variance is not None: unnorm_power = norm_power * (variance * n_ph) / 2.0 elif norm == "leahy": unnorm_power = norm_power * n_ph / 2.0 elif norm == "frac": if background_flux > 0: unnorm_power = norm_power * ((n_ph / n_bin - background_flux) ** 2 * n_bin) / (2.0 * dt) else: unnorm_power = norm_power * (n_ph**2 / n_bin) / (2.0 * dt) elif norm == "abs": unnorm_power = norm_power * dt * n_bin / 2.0 elif norm == "none": unnorm_power = norm_power else: raise ValueError("Unknown value for the norm") if power_type == "all": return unnorm_power if power_type == "real": return unnorm_power.real if power_type in ["abs", "absolute"]: return np.abs(unnorm_power) raise ValueError("Unrecognized power type")
@vectorize( [ "float64(float64, float64, float64, float64, int64, float64)", "float64(float64, float64, float64, float64, float64, float64)", ], nopython=True, ) def _bias_term(power1, power2, power1_noise, power2_noise, n_ave, input_intrinsic_coherence): if n_ave > 500: return 0.0 bsq = power1 * power2 - input_intrinsic_coherence * (power1 - power1_noise) * ( power2 - power2_noise ) return bsq / n_ave
[docs] def bias_term(power1, power2, power1_noise, power2_noise, n_ave, intrinsic_coherence=1.0): """ Bias term needed to calculate the coherence. Introduced by Vaughan & Nowak 1997, ApJ 474, L43 but implemented here according to the formulation in Ingram 2019, MNRAS 489, 392 As recommended in the latter paper, returns 0 if n_ave > 500 Parameters ---------- power1 : float `np.array` sub-band periodogram power2 : float `np.array` reference-band periodogram power1_noise : float Poisson noise level of the sub-band periodogram power2_noise : float Poisson noise level of the reference-band periodogram n_ave : int or float number of intervals that have been averaged to obtain the input spectra Other Parameters ---------------- intrinsic_coherence : float, default 1 If known, the intrinsic coherence. Returns ------- bias : float `np.array`, same shape as ``power1`` and ``power2`` The bias term """ return _bias_term(power1, power2, power1_noise, power2_noise, n_ave, intrinsic_coherence)
@vectorize(["float64(float64, float64, float64)"], nopython=True) def _apply_low_lim_to_coherence_uncertainty(coherence, uncertainty, min_uncertainty): """ Apply a low limit to the uncertainty on the coherence, to avoid zero or negative uncertainties. Parameters ---------- coherence : float or float `np.array` The coherence values at all frequencies. uncertainty : float or float `np.array` The uncertainty on the coherence at all frequencies. Must have the same shape as `coherence`. min_uncertainty : float or float `np.array` The minimum uncertainty to apply. Can be a single value or an array of the same shape as `coherence` and `uncertainty`. Returns ------- uncertainty : float or float `np.array` The uncertainty on the coherence, with the low limit applied. Examples -------- >>> coherence = np.array([0.5, 0.0, 0.5]) >>> uncertainty = np.array([0.1, 0.0, 0.05]) >>> min_uncertainty = 0.01 >>> expected = np.array([0.1, 0.01, 0.05]) >>> res = _apply_low_lim_to_coherence_uncertainty(coherence, uncertainty, min_uncertainty) >>> assert np.allclose(res, expected) # The same, but with a different minimum uncertainty for each frequency. >>> min_uncertainty = np.array([0.01, 0.1, 0.1]) >>> res = _apply_low_lim_to_coherence_uncertainty(coherence, uncertainty, min_uncertainty) >>> expected = np.array([0.1, 0.1, 0.1]) >>> assert np.allclose(res, expected) # All the same, but with scalar inputs. >>> coherence = 0.5 >>> uncertainty = 0.0 >>> min_uncertainty = 0.01 >>> expected = 0.01 >>> res = _apply_low_lim_to_coherence_uncertainty(coherence, uncertainty, min_uncertainty) >>> assert np.isclose(res, expected) """ bad = (coherence == 0) | (uncertainty < min_uncertainty) if bad: uncertainty = min_uncertainty return uncertainty @vectorize( [ "float64(complex128, float64, float64, float64, float64, int64, float64)", "float64(complex128, float64, float64, float64, float64, float64, float64)", ], nopython=True, ) def _raw_coherence( cross_power, power1, power2, power1_noise, power2_noise, n_ave, intrinsic_coherence, ): """ Raw coherence estimations from cross and power spectra. Vaughan & Nowak 1997, ApJ 474, L43 Parameters ---------- cross_power : complex `np.array` cross spectrum power1 : float `np.array` sub-band periodogram power2 : float `np.array` reference-band periodogram power1_noise : float Poisson noise level of the sub-band periodogram power2_noise : float Poisson noise level of the reference-band periodogram n_ave : int or float number of intervals that have been averaged to obtain the input spectra Other Parameters ---------------- intrinsic_coherence : float, default 1 If known, the intrinsic coherence. return_uncertainty : bool, default False Whether to return the uncertainty on the coherence, calculated according to VN97 Returns ------- coherence : float `np.array` The raw coherence values at all frequencies. """ bsq = _bias_term(power1, power2, power1_noise, power2_noise, n_ave, intrinsic_coherence) num = (cross_power * np.conj(cross_power)).real - bsq if num < 0: num = 0 den = power1 * power2 coherence = num / den return coherence
[docs] def raw_coherence( cross_power, power1, power2, power1_noise, power2_noise, n_ave, intrinsic_coherence=1.0, return_uncertainty=False, ): r""" Raw coherence estimations from cross and power spectra. The function is defined as (see Ingram 2019 [#]_ : .. math:: \tilde{g}^2(f) = \frac{|\langle \tilde{C}(f) \rangle|^2 - \tilde{b}^2} {\langle \tilde{P}_1(f) \rangle \langle \tilde{P}_2(f) \rangle} where :math:`\tilde{b}^2` is the bias term that accounts for the contribution of Poisson noise to the cross spectrum (see :func:`stingray.fourier.bias_term`), and tilde generally indicates noisy measurements of the cross spectrum :math:`C` and the power spectra :math:`P_n`. Parameters ---------- cross_power : complex `np.array` cross spectrum power1 : float `np.array` sub-band periodogram power2 : float `np.array` reference-band periodogram power1_noise : float Poisson noise level of the sub-band periodogram power2_noise : float Poisson noise level of the reference-band periodogram n_ave : int or float number of intervals that have been averaged to obtain the input spectra Other Parameters ---------------- intrinsic_coherence : float, default 1 If known, the intrinsic coherence. return_uncertainty : bool, default False Whether to return the uncertainty on the coherence, calculated according to VN97 Returns ------- coherence : float `np.array` The raw coherence values at all frequencies. References ---------- .. [#] https://doi.org/10.1093/mnras/stz2409 """ coherence = _raw_coherence( cross_power, power1, power2, power1_noise, power2_noise, n_ave, intrinsic_coherence, ) if return_uncertainty: min_uncertainty = 1 / n_ave uncertainty = (2**0.5 * coherence * (1 - coherence)) / (np.sqrt(coherence) * n_ave**0.5) uncertainty = _apply_low_lim_to_coherence_uncertainty( coherence, uncertainty, min_uncertainty ) return coherence, uncertainty return coherence
@njit def _intrinsic_coherence_uncertainties( bsq, num, power1_sub, power2_sub, power1_noise, power2_noise, coherence, n_ave ): """Calculate the uncertainty on the intrinsic coherence, according to VN97, eq. 8. Parameters ---------- bsq : float The bias term squared, calculated according to the bias_term function. num : float The numerator of the coherence calculation, i.e. (cross_power * np.conj(cross_power)).real - bsq. power1_sub : float The subtracted power of the first band, i.e. power1 - power1_noise power2_sub : float The subtracted power of the second band, i.e. power2 - power2_noise power1_noise : float The Poisson noise level of the first band periodogram power2_noise : float The Poisson noise level of the second band periodogram coherence : float The intrinsic coherence calculated according to the _intrinsic_coherence function. n_ave : int or float The number of intervals that have been averaged to obtain the input spectra """ # Terms from VN97, eq. 8, for the uncertainty on the coherence. err1 = 2.0 * (bsq**2) * n_ave / (num - bsq) ** 2 err2 = (power1_noise / power1_sub) ** 2 + (power2_noise / power2_sub) ** 2 err3 = 2.0 * ((1 - coherence) / (coherence**1.5)) ** 2 uncertainty = coherence / np.sqrt(n_ave) * np.sqrt(err1 + err2 + err3) return uncertainty @vectorize( [ "bool(float64, float64, float64, float64, int64, float64)", "bool(float64, float64, float64, float64, float64, float64)", ], nopython=True, ) def check_powers_for_intrinsic_coherence( power1, power2, power1_noise, power2_noise, n_ave, threshold ): """Check if the powers are above the threshold for the intrinsic coherence to be well defined. Parameters ---------- power1 : float `np.array` sub-band periodogram power2 : float `np.array` reference-band periodogram power1_noise : float Poisson noise level of the sub-band periodogram power2_noise : float Poisson noise level of the reference-band periodogram n_ave : int or float number of intervals that have been averaged to obtain the input spectra threshold : float The threshold in sigma above the noise level for the powers to be considered "high". Returns ------- low_power1 : bool `np.array` Whether the powers of the first band are below the threshold. low_power2 : bool `np.array` Whether the powers of the second band are below the threshold. """ # Consider low power when the power is less than 3 sigma above the noise level. # This is somewhat arbitrary, better criteria suggestions are welcome. low_power1 = (power1 - power1_noise) <= threshold * power1_noise / np.sqrt(n_ave) low_power2 = (power2 - power2_noise) <= threshold * power2_noise / np.sqrt(n_ave) return low_power1 or low_power2 @njit def _intrinsic_coherence( cross_power, power1, power2, power1_noise, power2_noise, n_ave, ): """ Intrinsic coherence estimations from cross and power spectra. Vaughan & Nowak 1997, ApJ 474, L43 For errors, assumes **high powers, high coherence**. See eq. 8 of the paper. Powers below 3 sigma above the noise level (P_noise / sqrt(MW)) are considered too low, and the coherence will be set to NaN. TODO: implement a more general treatment of the uncertainty. Parameters ---------- cross_power : complex `np.array` cross spectrum power1 : float `np.array` sub-band periodogram power2 : float `np.array` reference-band periodogram power1_noise : float Poisson noise level of the sub-band periodogram power2_noise : float Poisson noise level of the reference-band periodogram n_ave : int or float number of intervals that have been averaged to obtain the input spectra Other Parameters ---------------- return_uncertainty : bool, default False Whether to return the uncertainty on the coherence, calculated according to Vaughan & Nowak 1997, ApJ 474, L43, eq. 8. Returns ------- coherence : float `np.array` or tuple of two float `np.array` The intrinsic coherence values at all frequencies. It is 0 if the numerator of the coherence calculation is negative, and NaN if the powers are too low compared to the noise level. uncertainty : float `np.array`, optional The uncertainty on the intrinsic coherence, calculated according to Vaughan & Nowak 1997, ApJ 474, L43, eq. 8. """ if check_powers_for_intrinsic_coherence(power1, power2, power1_noise, power2_noise, n_ave, 3): # If the powers are too close to the noise level, the coherence is not well defined. return np.nan, np.nan bsq = _bias_term(power1, power2, power1_noise, power2_noise, n_ave, 1) num = (cross_power * np.conj(cross_power)).real - bsq if num < 0: return 0.0, np.nan power1_sub = power1 - power1_noise power2_sub = power2 - power2_noise den = power1_sub * power2_sub coherence = num / den uncertainty = _intrinsic_coherence_uncertainties( bsq, num, power1_sub, power2_sub, power1_noise, power2_noise, coherence, n_ave ) return coherence, uncertainty @njit def _intrinsic_coherence_with_adjusted_bias( cross_power, power1, power2, power1_noise, power2_noise, n_ave, atol=0.01, ): """ Intrinsic coherence estimations from cross and power spectra, with adjusted bias term. Follows the procedure from sec. 5 of Ingram 2019, MNRAS 489, 3927, which iteratively adjusts the bias term For errors, assumes **high powers, high coherence**. See eq. 8 of Vaughan & Nowak 1997. Powers below 3 sigma above the noise level (P_noise / sqrt(MW)) are considered too low, and the coherence will be set to NaN. Parameters ---------- cross_power : complex `np.array` cross spectrum power1 : float `np.array` sub-band periodogram power2 : float `np.array` reference-band periodogram power1_noise : float Poisson noise level of the sub-band periodogram power2_noise : float Poisson noise level of the reference-band periodogram n_ave : int or float number of intervals that have been averaged to obtain the input spectra Other Parameters ---------------- atol : float, default 0.01 The absolute tolerance for the convergence of the iterative procedure to adjust the bias term. Returns ------- coherence : float `np.array` or tuple of two float `np.array` The intrinsic coherence values at all frequencies. It is 0 if the numerator of the coherence calculation is negative, and NaN if the powers are too low compared to the noise level. uncertainty : float `np.array`, optional The uncertainty on the intrinsic coherence, calculated according to Vaughan & Nowak 1997, ApJ 474, L43, eq. 8. not_converged_flag : bool `np.array` Whether the bias term adjustment procedure converged (False) or not (True). """ if check_powers_for_intrinsic_coherence(power1, power2, power1_noise, power2_noise, n_ave, 3): # If the powers are too close to the noise level, the coherence is not well defined. return np.nan, np.nan, False # Consider low power when the power is less than 3 sigma above the noise level. # This is somewhat arbitrary, better criteria suggestions are welcome. power1_sub = power1 - power1_noise power2_sub = power2 - power2_noise current_coherence = 1.0 converged = False for _ in range(40): bsq = _bias_term(power1, power2, power1_noise, power2_noise, n_ave, current_coherence) num = (cross_power * np.conj(cross_power)).real - bsq if num <= 0: # if the current coherence still drops, the bias term increases, # so at this point it is useless to keep iterating. return 0.0, np.nan, False den = power1_sub * power2_sub coherence = num / den if np.abs(current_coherence - coherence) < atol: converged = True break current_coherence = coherence uncertainty = _intrinsic_coherence_uncertainties( bsq, num, power1_sub, power2_sub, power1_noise, power2_noise, coherence, n_ave ) return coherence, uncertainty, not converged @njit def _intrinsic_coherence_array( cross_power, power1, power2, power1_noise, power2_noise, n_ave, adjust_bias=False, atol=0.01, ): r""" Intrinsic coherence estimations from cross and power spectra. Vaughan & Nowak 1997, ApJ 474, L43 .. math:: \tilde{\gamma}^2(f) = \frac{|\langle \tilde{C}(f) \rangle|^2 - \tilde{b}^2} {(\langle \tilde{P}_1(f) \rangle - \tilde{P}_{1, \rm noise}) (\langle \tilde{P}_2(f) \rangle - \tilde{P}_{2, \rm noise})} where :math:`\tilde{b}^2` is the bias term that accounts for the contribution of Poisson noise to the cross spectrum (see :func:`stingray.fourier.bias_term`), and tilde generally indicates noisy measurements of the cross spectrum :math:`C` and the power spectra :math:`P_n`. The terms :math:`\tilde{P}_{\rm n, \rm noise}` are the estimates of the contribution of Poisson noise to the power spectra. The bias term depends on the intrinsic coherence itself, so it can be calculated iteratively following the procedure from sec. 5 of Ingram 2019, MNRAS 489, 3927, which adjusts the bias term until convergence is reached. This is done if ``adjust_bias`` is set to True. It is typically a very small correction. For errors, assumes **high powers, high coherence**. See eq. 8 of the paper. Powers below 3 sigma above the noise level (P_noise / sqrt(MW)) are considered too low, and the coherence will be set to NaN. TODO: implement a more general treatment of the uncertainty. Parameters ---------- cross_power : complex `np.array` Cross spectrum power1 : float `np.array` Sub-band periodogram. Must have the same shape as `cross_power`. power2 : float `np.array` Reference-band periodogram. Must have the same shape as `cross_power`. power1_noise : float `np.array` Poisson noise level of the sub-band periodogram. Can have a single value or the same shape as `cross_power`. power2_noise : float `np.array` Poisson noise level of the reference-band periodogram. Can have a single value or the same shape as `cross_power`. n_ave : int or float `np.array` number of intervals that have been averaged to obtain the input spectra. Can have a single value or the same shape as `cross_power`. Other Parameters ---------------- adjust_bias : bool, default False Whether to adjust the bias term iteratively following the procedure from sec. 5 of Ingram 2019, MNRAS 489, 3927. It is typically a very small correction, but it can be relevant for low coherence values and/or low number of averaged intervals atol : float, default 0.01 The absolute tolerance for the convergence of the iterative procedure to adjust the bias term. Only relevant if ``adjust_bias`` is True. Returns ------- coherence : float `np.array` The intrinsic coherence values at all frequencies. It is 0 if the numerator of the coherence calculation is negative, and NaN if the powers are too low compared to the noise level. uncertainty : float `np.array` The uncertainty on the intrinsic coherence, calculated according to Vaughan & Nowak 1997, ApJ 474, L43, eq. 8. no_conversion_flags : `list` of int List of indices of ``coherence`` corresponding to frequencies where the bias term adjustment procedure did not converge. Only relevant if ``adjust_bias`` is True. """ coherence = np.empty(cross_power.shape, dtype=np.float64) uncertainty = np.empty(cross_power.shape, dtype=np.float64) no_conversion_flags = [] for i in range(cross_power.size): if np.size(power1_noise) == np.size(cross_power): p1_n = float(power1_noise[i]) else: p1_n = float(power1_noise[0]) if np.size(power2_noise) == np.size(cross_power): p2_n = float(power2_noise[i]) else: p2_n = float(power2_noise[0]) if np.size(n_ave) == np.size(cross_power): n_a = int(n_ave[i]) else: n_a = int(n_ave[0]) if adjust_bias: coherence[i], uncertainty[i], flag = _intrinsic_coherence_with_adjusted_bias( cross_power[i], power1[i], power2[i], p1_n, p2_n, n_a, atol=atol ) if flag: no_conversion_flags.append(i) else: coherence[i], uncertainty[i] = _intrinsic_coherence( cross_power[i], power1[i], power2[i], p1_n, p2_n, n_a ) return coherence, uncertainty, no_conversion_flags
[docs] def intrinsic_coherence( cross_power, power1, power2, power1_noise, power2_noise, n_ave, return_uncertainty=False, adjust_bias=False, atol=0.01, ): r""" Intrinsic coherence estimations from cross and power spectra. Vaughan & Nowak 1997, ApJ 474, L43 .. math:: \tilde{\gamma}^2(f) = \frac{|\langle \tilde{C}(f) \rangle|^2 - \tilde{b}^2} {(\langle \tilde{P}_1(f) \rangle - \tilde{P}_{1, \rm noise}) (\langle \tilde{P}_2(f) \rangle - \tilde{P}_{2, \rm noise})} where :math:`\tilde{b}^2` is the bias term that accounts for the contribution of Poisson noise to the cross spectrum (see :func:`stingray.fourier.bias_term`), and tilde generally indicates noisy measurements of the cross spectrum :math:`C` and the power spectra :math:`P_n`. The terms :math:`\tilde{P}_{\rm n, \rm noise}` are the estimates of the contribution of Poisson noise to the power spectra. The bias term depends on the intrinsic coherence itself, so it can be calculated iteratively following the procedure from sec. 5 of Ingram 2019, MNRAS 489, 3927, which adjusts the bias term until convergence is reached. This is done if ``adjust_bias`` is set to True. It is typically a very small correction. For errors, assumes **high powers, high coherence**. See eq. 8 of the paper. Powers below 3 sigma above the noise level (P_noise / sqrt(MW)) are considered too low, and the coherence will be set to NaN. TODO: implement a more general treatment of the uncertainty. Parameters ---------- cross_power : complex `np.array` Cross spectrum power1 : float `np.array` Sub-band periodogram. Must have the same shape as `cross_power`. power2 : float `np.array` Reference-band periodogram. Must have the same shape as `cross_power`. power1_noise : float or float `np.array` of the same shape as `cross_power` Poisson noise level of the sub-band periodogram power2_noise : float or float `np.array` of the same shape as `cross_power` Poisson noise level of the reference-band periodogram n_ave : int or float or int `np.array` of the same shape as `cross_power` number of intervals that have been averaged to obtain the input spectra Other Parameters ---------------- return_uncertainty : bool, default False Whether to return the uncertainty on the coherence, calculated according to Vaughan & Nowak 1997, ApJ 474, L43, eq. 8. atol : float, default 0.01 The absolute tolerance for the convergence of the iterative procedure to adjust the bias term.Only relevant if ``adjust_bias`` is True. Returns ------- coherence : float `np.array` or tuple of two float `np.array` The intrinsic coherence values at all frequencies. It is 0 if the numerator of the coherence calculation is negative, and NaN if the powers are too low compared to the noise level. uncertainty : float `np.array`, optional The uncertainty on the intrinsic coherence, calculated according to Vaughan & Nowak 1997, ApJ 474, L43, eq. 8. """ single_power = False if not isinstance(cross_power, np.ndarray): cross_power = np.asanyarray([cross_power]) power1 = np.asanyarray([power1]) power2 = np.asanyarray([power2]) single_power = True if not isinstance(power1_noise, Iterable): power1_noise = np.asanyarray([power1_noise]) if not isinstance(power2_noise, Iterable): power2_noise = np.asanyarray([power2_noise]) if not isinstance(n_ave, Iterable): n_ave = np.asanyarray([n_ave]) coherence, uncertainty, flagged = _intrinsic_coherence_array( cross_power, power1, power2, power1_noise, power2_noise, n_ave, adjust_bias=adjust_bias, atol=atol, ) if len(flagged) > 0: warnings.warn( "The iterative procedure to adjust the bias term did not converge after 40 " "iterations. Consider rebinning the spectra to increase the signal-to-noise " "ratio, or to use a more permissive tolerance (``atol`` parameter)." ) if np.any(np.isnan(coherence)): warnings.warn( "NaN values detected in intrinsic_coherence calculation. This happens when the powers " "are too close to the noise level, and the coherence is not well defined. Consider " "rebinning the spectra to increase the signal-to-noise ratio." ) if np.any(np.isclose(coherence, 0)): warnings.warn( "Zero values detected in intrinsic_coherence calculation. This usually happens" " when the bias term is larger than the cross spectrum, and the coherence is not" " well defined. " ) if single_power: coherence = coherence[0] uncertainty = uncertainty[0] if return_uncertainty: return coherence, uncertainty return coherence
def estimate_intrinsic_coherence(cross_power, power1, power2, power1_noise, power2_noise, n_ave): """ Estimate intrinsic coherence Use the iterative procedure from sec. 5 of Ingram 2019, MNRAS 489, 392 Parameters ---------- cross_power : complex `np.array` cross spectrum power1 : float `np.array` sub-band periodogram power2 : float `np.array` reference-band periodogram power1_noise : float Poisson noise level of the sub-band periodogram power2_noise : float Poisson noise level of the reference-band periodogram n_ave : int or float number of intervals that have been averaged to obtain the input spectra Returns ------- coherence : float `np.array` The estimated intrinsic coherence, at all frequencies. """ warnings.warn( "estimate_intrinsic_coherence is deprecated. Use intrinsic_coherence with adjust_bias=True instead.", DeprecationWarning, ) new_coherence = intrinsic_coherence( cross_power, power1, power2, power1_noise, power2_noise, n_ave, return_uncertainty=False, adjust_bias=True, ) return new_coherence def get_rms_from_rms_norm_periodogram(power_sqrms, poisson_noise_sqrms, df, M, low_M_buffer_size=4): r"""Calculate integrated rms spectrum (frac or abs). If M=1, it starts by rebinning the powers slightly in order to get a slightly better approximation for the error bars. Parameters ---------- power_sqrms : array-like Powers, in units of fractional rms ($(rms/mean)^2 Hz{-1}$) poisson_noise_sqrms : float Poisson noise level, in units of fractional rms ($(rms/mean)^2 Hz{-1}$ df : float or ``np.array``, same dimension of ``power_sqrms`` The frequency resolution of each power M : int or ``np.array``, same dimension of ``power_sqrms`` The number of powers averaged to obtain each value of power. Other Parameters ---------------- low_M_buffer_size : int, default 4 If M=1, the powers are rebinned to have a minimum of ``low_M_buffer_size`` powers in each bin. This is done to get a better estimate of the error bars. """ from stingray.utils import rebin_data m_is_iterable = isinstance(M, Iterable) df_is_iterable = isinstance(df, Iterable) # if M is an iterable but all values are the same, let's simplify if m_is_iterable and len(list(set(M))) == 1: M = M[0] m_is_iterable = False # the same with df if df_is_iterable and len(list(set(df))) == 1: df = df[0] df_is_iterable = False low_M_values = M < 30 if np.any(M < 30): quantity = "Some" if not m_is_iterable or np.count_nonzero(low_M_values) == M.size: quantity = "All" warnings.warn( f"{quantity} power spectral bins have M<30. The error bars on the rms might be wrong. " "In some cases one might try to increase the number of segments, for example by " "reducing the segment size, in order to obtain at least 30 segments." ) # But they cannot be of different kind. There would be something wrong with the data if m_is_iterable != df_is_iterable: raise ValueError("M and df must be either both constant, or none of them.") # If powers are not rebinned and M=1, we rebin them slightly. The error on the power is tricky, # because powers follow a non-central chi squared distribution. If we combine powers with # very different underlying signal level, the error bars will be completely wrong. But # nearby powers have a higher chance of having similar values, and so, by combining them, # we have a higher chance of obtaining sensible quasi-Gaussian error bars, easier to # propagate through standard quadrature summation if not m_is_iterable and M == 1 and power_sqrms.size > low_M_buffer_size: _, local_power_sqrms, _, local_M = rebin_data( np.arange(power_sqrms.size) * df, power_sqrms, df * low_M_buffer_size, yerr=None, method="average", dx=df, ) local_df = df * local_M total_local_powers = local_M * local_power_sqrms.size total_power_err = np.sqrt(np.sum(local_power_sqrms**2 * local_df**2 / local_M)) total_power_err *= power_sqrms.size / total_local_powers else: total_power_err = np.sqrt(np.sum(power_sqrms**2 * df**2 / M)) powers_sub = power_sqrms - poisson_noise_sqrms total_power_sub = np.sum(powers_sub * df) if total_power_sub < 0: # By the definition, it makes no sense to define an error bar here. warnings.warn("Poisson-subtracted power is below 0") return 0.0, 0.0 rms = np.sqrt(total_power_sub) high_snr_err = 0.5 / rms * total_power_err return rms, high_snr_err def get_rms_from_unnorm_periodogram( unnorm_powers, nphots_per_segment, df, M=1, poisson_noise_unnorm=None, segment_size=None, kind="frac", ): """Calculate the fractional rms amplitude from unnormalized powers. We assume the powers come from an unnormalized Bartlett periodogram. If so, the Poisson noise level is ``nphots_per_segment``, but the user can specify otherwise (e.g. if the Poisson noise level is altered by dead time). The ``segment_size`` and ``nphots_per_segment`` parameters refer to the length and averaged counts of each segment of data used for the Bartlett periodogram. Parameters ---------- unnorm_powers : np.ndarray The unnormalized power spectrum nphots_per_segment : float The averaged number of photons per segment of the data used for the Bartlett periodogram df : float The frequency resolution of the periodogram Other parameters ---------------- poisson_noise_unnorm : float The unnormalized Poisson noise level segment_size : float The size of the segment, in seconds M : int The number of segments averaged to obtain the periodogram kind : str One of "frac" or "abs" """ if segment_size is None: segment_size = 1 / np.min(df) if poisson_noise_unnorm is None: poisson_noise_unnorm = nphots_per_segment meanrate = nphots_per_segment / segment_size def to_leahy(powers): return powers * 2.0 / nphots_per_segment def to_frac(powers): return to_leahy(powers) / meanrate def to_abs(powers): return to_leahy(powers) * meanrate if kind.startswith("frac"): to_norm = to_frac elif kind.startswith("abs"): to_norm = to_abs else: raise ValueError("Only 'frac' or 'abs' rms are supported.") poisson = to_norm(poisson_noise_unnorm) powers = to_norm(unnorm_powers) return get_rms_from_rms_norm_periodogram(powers, poisson, df, M) def rms_calculation( unnorm_powers, min_freq, max_freq, nphots, T, M_freqs, K_freqs, freq_bins, poisson_noise_unnorm, deadtime=0.0, ): """ Compute the fractional rms amplitude in the given power or cross spectrum NOTE: all array quantities are already in the correct energy range Parameters ---------- unnorm_powers : array of float unnormalised power or cross spectrum, the array has already been filtered for the given frequency range min_freq : float The lower frequency bound for the calculation (from the freq grid). max_freq : float The upper frequency bound for the calculation (from the freq grid). nphots : float Number of photons for the full power or cross spectrum T : float Time length of the light curve M_freq : scalar or array of float If scalar, it is the number of segments in the AveragedCrossspectrum If array, it is the number of segments times the rebinning sample in the given frequency range. K_freq : scalar or array of float If scalar, the power or cross spectrum is not rebinned (K_freq = 1) If array, the power or cross spectrum is rebinned and it is the rebinned sample in the given frequency range. freq_bins : integer if the cross or power spectrum is rebinned freq_bins = 1, if it NOT rebinned freq_bins is the number of frequency bins in the given frequency range. poisson_noise_unnorm : float This is the Poisson noise level unnormalised. Other parameters ---------------- deadtime : float Deadtime of the instrument Returns ------- rms : float The fractional rms amplitude contained between ``min_freq`` and ``max_freq``. rms_err : float The error on the fractional rms amplitude. """ warnings.warn( "The rms_calculation function is deprecated. Use get_rms_from_unnorm_periodogram instead.", DeprecationWarning, ) rms_norm_powers = unnorm_powers * 2 * T / nphots**2 rms_poisson_noise = poisson_noise_unnorm * 2 * T / nphots**2 df = 1 / T * K_freqs rms, rms_err = get_rms_from_rms_norm_periodogram( rms_norm_powers, rms_poisson_noise, df, M_freqs ) return rms, rms_err def error_on_averaged_cross_spectrum( cross_power, seg_power, ref_power, n_ave, seg_power_noise, ref_power_noise, common_ref=False ): """ Error on cross spectral quantities. `Ingram 2019<https://ui.adsabs.harvard.edu/abs/2019MNRAS.489.3927I/abstract>`__ details the derivation of the corrected formulas when the subject band is included in the reference band (and photons are presents in both bands, so this does _not_ involve overlapping bands from different detectors). The keyword argument ``common_ref`` indicates if the reference band also contains the photons of the subject band. Note: this is only valid for a very large number of averaged powers. Beware if n_ave < 50 or so. Parameters ---------- cross_power : complex `np.array` cross spectrum seg_power : float `np.array` sub-band periodogram ref_power : float `np.array` reference-band periodogram seg_power_noise : float Poisson noise level of the sub-band periodogram ref_power_noise : float Poisson noise level of the reference-band periodogram n_ave : int or float number of intervals that have been averaged to obtain the input spectra Other Parameters ---------------- common_ref : bool, default False Are data in the sub-band also included in the reference band? Returns ------- dRe : float `np.array` Error on the real part of the cross spectrum dIm : float `np.array` Error on the imaginary part of the cross spectrum dphi : float `np.array` Error on the angle (or phase lag) dG : float `np.array` Error on the modulus of the cross spectrum """ if (not isinstance(n_ave, Iterable) and n_ave < 30) or ( isinstance(n_ave, Iterable) and np.any(n_ave) < 30 ): warnings.warn( "n_ave is below 30. Please note that the error bars " "on the quantities derived from the cross spectrum " "are only reliable for a large number of averaged " "powers." ) two_n_ave = 2 * n_ave if common_ref: Gsq = (cross_power * np.conj(cross_power)).real bsq = bias_term(seg_power, ref_power, seg_power_noise, ref_power_noise, n_ave) frac = (Gsq - bsq) / (ref_power - ref_power_noise) power_over_2n = ref_power / two_n_ave # Eq. 18 dRe = dIm = dG = np.sqrt(power_over_2n * (seg_power - frac)) # Eq. 19 dphi = np.sqrt( power_over_2n * (seg_power / (Gsq - bsq) - 1 / (ref_power - ref_power_noise)) ) else: PrPs = ref_power * seg_power dRe = np.sqrt((PrPs + cross_power.real**2 - cross_power.imag**2) / two_n_ave) dIm = np.sqrt((PrPs - cross_power.real**2 + cross_power.imag**2) / two_n_ave) gsq = raw_coherence( cross_power, seg_power, ref_power, seg_power_noise, ref_power_noise, n_ave, return_uncertainty=False, ) dphi = np.sqrt((1 - gsq) / (2 * gsq * n_ave)) dG = np.sqrt(PrPs / n_ave) return dRe, dIm, dphi, dG def cross_to_covariance(cross_power, ref_power, ref_power_noise, delta_nu): """ Convert a cross spectrum into a covariance spectrum. Covariance: Wilkinson & Uttley 2009, MNRAS, 397, 666 Complex covariance: Mastroserio et al. 2018, MNRAS, 475, 4027 Parameters ---------- cross_power : complex `np.array` cross spectrum ref_power : float `np.array` reference-band periodogram ref_power_noise : float Poisson noise level of the reference-band periodogram delta_nu : float or `np.array` spectral resolution. Can be a float, or an array if the spectral resolution is not constant throughout the periodograms Returns ------- covariance : complex `np.array` The cross spectrum, normalized as a covariance. """ return cross_power * np.sqrt(delta_nu / (ref_power - ref_power_noise)) def _which_segment_idx_fun(binned=False, dt=None): """ Select which segment index function from ``gti.py`` to use. If ``binned`` is ``False``, call the unbinned function. If ``binned`` is not ``True``, call the binned function. Note that in the binned function ``dt`` is an optional parameter. We pass it if the user specifies it. """ # Make function interface equal (fluxes gets ignored) if not binned: # Define a new function, make sure that, by default, the sort check # is disabled. def fun(*args, **kwargs): check_sorted = kwargs.pop("check_sorted", False) return generate_indices_of_segment_boundaries_unbinned( *args, check_sorted=check_sorted, **kwargs ) else: # Define a new function, so that we can pass the correct dt as an # argument. def fun(*args, **kwargs): return generate_indices_of_segment_boundaries_binned(*args, dt=dt, **kwargs) return fun def get_average_ctrate(times, gti, segment_size, counts=None): """ Calculate the average count rate during the observation. This function finds the same segments that the averaged periodogram functions (``avg_cs_from_iterables``, ``avg_pds_from_iterables`` etc) will use, and returns the mean count rate. If ``counts`` is ``None``, the input times are interpreted as events. Otherwise, the number of events is taken from ``counts`` Parameters ---------- times : float `np.array` Array of times gti : [[gti00, gti01], [gti10, gti11], ...] good time intervals segment_size : float length of segments Other parameters ---------------- counts : float `np.array`, default None Array of counts per bin Returns ------- ctrate : float The average count rate in the segments that are used for the analysis. Examples -------- >>> times = np.sort(np.random.uniform(0, 1000, 1000)) >>> gti = np.asanyarray([[0, 1000]]) >>> counts, _ = np.histogram(times, bins=np.linspace(0, 1000, 11)) >>> bin_times = np.arange(50, 1000, 100) >>> assert get_average_ctrate(bin_times, gti, 1000, counts=counts) == 1.0 >>> assert get_average_ctrate(times, gti, 1000) == 1.0 """ n_ph = 0 n_intvs = 0 binned = counts is not None func = _which_segment_idx_fun(binned) for _, _, idx0, idx1 in func(times, gti, segment_size): if not binned: n_ph += idx1 - idx0 else: n_ph += np.sum(counts[idx0:idx1]) n_intvs += 1 return n_ph / (n_intvs * segment_size) def get_flux_iterable_from_segments( times, gti, segment_size, n_bin=None, dt=None, fluxes=None, errors=None ): """ Get fluxes from different segments of the observation. If ``fluxes`` is ``None``, the input times are interpreted as events, and they are split into many binned series of length ``segment_size`` with ``n_bin`` bins. If ``fluxes`` is an array, the number of events corresponding to each time bin is taken from ``fluxes`` Therefore, at least one of either ``n_bin`` and ``fluxes`` needs to be specified. Parameters ---------- times : float `np.array` Array of times gti : [[gti00, gti01], [gti10, gti11], ...] good time intervals segment_size : float length of segments. If ``None``, the full light curve is used. Other parameters ---------------- n_bin : int, default None Number of bins to divide the ``segment_size`` in fluxes : float `np.array`, default None Array of fluxes. errors : float `np.array`, default None Array of error bars corresponding to the flux values above. dt : float, default None Time resolution of the light curve used to produce periodograms Yields ------ flux : `np.array` Array of fluxes err : `np.array` (optional) if ``errors`` is None, an array of errors in the segment """ if fluxes is None and n_bin is None: raise ValueError( "At least one between fluxes (if light curve) and " "n_bin (if events) has to be set" ) binned = fluxes is not None cast_kind = float if dt is None and binned: dt = np.median(np.diff(times[:100])) if binned: fluxes = np.asanyarray(fluxes) if np.iscomplexobj(fluxes): cast_kind = complex if segment_size is None: segment_size = gti[-1, 1] - gti[0, 0] def fun(times, gti, segment_size): return [[gti[0, 0], gti[-1, 1], 0, times.size]] else: fun = _which_segment_idx_fun(binned, dt) for s, e, idx0, idx1 in fun(times, gti, segment_size): if idx1 - idx0 < 2: yield None continue if not binned: # astype here serves to avoid integer rounding issues in Windows, # where long is a 32-bit integer. cts = histogram( (times[idx0:idx1] - s).astype(float), bins=n_bin, range=[0, segment_size] ).astype(float) cts = np.array(cts) else: cts = fluxes[idx0:idx1].astype(cast_kind) if errors is not None: cts = cts, errors[idx0:idx1].astype(cast_kind) yield cts @njit() def _safe_array_slice_indices(input_size, input_center_idx, nbins): """Calculate the indices needed to extract a n-bin slice of an array, centered at an index. Let us say we have an array of size ``input_size`` and we want to extract a slice of ``nbins`` centered at index ``input_center_idx``. We should be robust when the slice goes beyond the edges of the input array, possibly leaving missing values in the output array. This function calculates the indices needed to extract the slice from the input array, and the indices in the output array that will be filled. In the most common case, the slice is entirely contained within the input array, so that the output slice will just be ``[0:nbins]`` and the input slice ``[input_center_idx - nbins // 2: input_center_idx - nbins // 2 + nbins]``. Parameters ---------- input_size : int Input array size center_idx : int Index of the center of the slice nbins : int Number of bins to extract Returns ------- input_slice : list Indices to extract the slice from the input array output_slice : list Indices to fill the output array Examples -------- >>> _safe_array_slice_indices(input_size=10, input_center_idx=5, nbins=3) ([4, 7], [0, 3]) If the slice goes beyond the right edge: the output slice will only cover the first two bins of the output array, and up to the end of the input array. >>> _safe_array_slice_indices(input_size=6, input_center_idx=5, nbins=3) ([4, 6], [0, 2]) """ minbin = input_center_idx - nbins // 2 maxbin = minbin + nbins if minbin < 0: output_slice = [-minbin, min(nbins, input_size - minbin)] input_slice = [0, minbin + nbins] elif maxbin > input_size: output_slice = [0, nbins - (maxbin - input_size)] input_slice = [minbin, input_size] else: output_slice = [0, nbins] input_slice = [minbin, maxbin] return input_slice, output_slice @njit() def extract_pds_slice_around_freq(freqs, powers, f0, nbins=100): """Extract a slice of PDS around a given frequency. This function extracts a slice of the power spectrum around a given frequency. The slice has a length of ``nbins``. If the slice goes beyond the edges of the power spectrum, the missing values are filled with NaNs. Parameters ---------- freqs : np.array Array of frequencies, the same for all powers powers : np.array Array of powers f0 : float Central frequency Other parameters ---------------- nbins : int, default 100 Number of bins to extract Examples -------- >>> freqs = np.arange(1, 100) * 0.1 >>> powers = 10 / freqs >>> f0 = 0.3 >>> p = extract_pds_slice_around_freq(freqs, powers, f0) >>> assert np.isnan(p[0]) >>> assert not np.any(np.isnan(p[48:])) """ powers = np.asarray(powers) chunk = np.zeros(nbins) + np.nan # fchunk = np.zeros(nbins) start_f_idx = np.searchsorted(freqs, f0) input_slice, output_slice = _safe_array_slice_indices(powers.size, start_f_idx, nbins) chunk[output_slice[0] : output_slice[1]] = powers[input_slice[0] : input_slice[1]] return chunk @njit() def _shift_and_average_core(input_array_list, weight_list, center_indices, nbins): """Core function to shift_and_add, JIT-compiled for your convenience. Parameters ---------- input_array_list : list of np.array List of input arrays weight_list : list of float List of weights for each input array center_indices : list of int Central indices of the slice of each input array to be summed nbins : int Number of bins to extract around the central index of each input array Returns ------- output_array : np.array Average of the input arrays, weighted by the weights sum_of_weights : np.array Sum of the weights at each output bin """ input_size = input_array_list[0].size output_array = np.zeros(nbins) sum_of_weights = np.zeros(nbins) for idx, array, weight in zip(center_indices, input_array_list, weight_list): input_slice, output_slice = _safe_array_slice_indices(input_size, idx, nbins) for i in range(input_slice[1] - input_slice[0]): output_array[output_slice[0] + i] += array[input_slice[0] + i] * weight sum_of_weights[output_slice[0] + i] += weight output_array = output_array / sum_of_weights return output_array, sum_of_weights def shift_and_add(freqs, power_list, f0_list, nbins=100, rebin=None, df=None, M=None): """Add a list of power spectra, centered on different frequencies. This is the basic operation for the shift-and-add operation used to track kHz QPOs in X-ray binaries (e.g. Méndez et al. 1998, ApJ, 494, 65). Parameters ---------- freqs : np.array Array of frequencies, the same for all powers. Must be sorted and on a uniform grid. power_list : list of np.array List of power spectra. Each power spectrum must have the same length as the frequency array. f0_list : list of float List of central frequencies Other parameters ---------------- nbins : int, default 100 Number of bins to extract rebin : int, default None Rebin the final spectrum by this factor. At the moment, the rebinning is linear. df : float, default None Frequency resolution of the power spectra. If not given, it is calculated from the input frequencies. M : int or list of int, default None Number of segments used to calculate each power spectrum. If a list is given, it must have the same length as the power list. Returns ------- f : np.array Array of output frequencies p : np.array Array of output powers n : np.array Number of contributing power spectra at each frequency Examples -------- >>> power_list = [[2, 5, 2, 2, 2], [1, 1, 5, 1, 1], [3, 3, 3, 5, 3]] >>> freqs = np.arange(5) * 0.1 >>> f0_list = [0.1, 0.2, 0.3, 0.4] >>> f, p, n = shift_and_add(freqs, power_list, f0_list, nbins=5) >>> assert np.array_equal(n, [2, 3, 3, 3, 2]) >>> assert np.array_equal(p, [2. , 2. , 5. , 2. , 1.5]) >>> assert np.allclose(f, [0.05, 0.15, 0.25, 0.35, 0.45]) """ # Check if the input list of power contains numpy arrays if not hasattr(power_list[0], "size"): power_list = np.asarray(power_list) # input_size = np.size(power_list[0]) freqs = np.asarray(freqs) # mid_idx = np.searchsorted(freqs, np.mean(f0_list)) if M is None: M = 1 if not isinstance(M, Iterable): M = np.ones(len(power_list)) * M center_f_indices = np.searchsorted(freqs, f0_list) final_powers, count = _shift_and_average_core(power_list, M, center_f_indices, nbins) if df is None: df = freqs[1] - freqs[0] final_freqs = np.arange(-nbins // 2, nbins // 2 + 1)[:nbins] * df final_freqs = final_freqs - (final_freqs[0] + final_freqs[-1]) / 2 + np.mean(f0_list) if rebin is not None: _, count, _, _ = rebin_data(final_freqs, count, rebin * df) final_freqs, final_powers, _, _ = rebin_data(final_freqs, final_powers, rebin * df) final_powers = final_powers / rebin return final_freqs, final_powers, count def avg_pds_from_iterable( flux_iterable, dt, norm="frac", use_common_mean=True, silent=False, return_subcs=False ): """ Calculate the average periodogram from an iterable of light curves Parameters ---------- flux_iterable : `iterable` of `np.array`s or of tuples (`np.array`, `np.array`) Iterable providing either equal-length series of count measurements, or of tuples (fluxes, errors). They must all be of the same length. dt : float Time resolution of the light curves used to produce periodograms Other Parameters ---------------- norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. silent : bool, default False Silence the progress bars return_subcs : bool, default False Return all sub spectra from each light curve chunk Returns ------- results : :class:`astropy.table.Table` Table containing the following columns: freq : `np.array` The periodogram frequencies power : `np.array` The normalized periodogram powers unnorm_power : `np.array` The unnormalized periodogram powers And a number of other useful diagnostics in the metadata, including all attributes needed to allocate Powerspectrum objects, such as all the input arguments of this function (``dt``, ``segment_size``), and, e.g. n : int the number of bins in the light curves used in each segment m : int the number of averaged periodograms mean : float the mean flux """ local_show_progress = show_progress if silent: def local_show_progress(a): return a # Initialize stuff cross = unnorm_cross = None n_ave = 0 sum_of_photons = 0 common_variance = None if return_subcs: subcs = [] unnorm_subcs = [] for flux in local_show_progress(flux_iterable): if flux is None or np.all(flux == 0): continue # If the iterable returns the uncertainty, use it to calculate the # variance. variance = None if isinstance(flux, tuple): flux, err = flux variance = np.mean(err) ** 2 # Calculate the FFT n_bin = flux.size ft = fft(flux) # This will only be used by the Leahy normalization, so only if # the input light curve is in units of counts/bin n_ph = flux.sum() unnorm_power = (ft * ft.conj()).real # Accumulate the sum of means and variances, to get the final mean and # variance the end sum_of_photons += n_ph if variance is not None: common_variance = sum_if_not_none_or_initialize(common_variance, variance) # In the first loop, define the frequency and the freq. interval > 0 if cross is None: fgt0 = positive_fft_bins(n_bin) freq = fftfreq(n_bin, dt)[fgt0] # No need for the negative frequencies unnorm_power = unnorm_power[fgt0] # If the user wants to normalize using the mean of the total # lightcurve, normalize it here cs_seg = copy.deepcopy(unnorm_power) if not use_common_mean: mean = n_ph / n_bin cs_seg = normalize_periodograms( unnorm_power, dt, n_bin, mean, n_ph=n_ph, norm=norm, variance=variance, ) # Accumulate the total sum cross spectrum cross = sum_if_not_none_or_initialize(cross, cs_seg) unnorm_cross = sum_if_not_none_or_initialize(unnorm_cross, unnorm_power) if return_subcs: subcs.append(cs_seg) unnorm_subcs.append(unnorm_power) n_ave += 1 # If there were no good intervals, return None if cross is None: return None # Calculate the mean number of photons per chunk n_ph = sum_of_photons / n_ave # Calculate the mean number of photons per bin common_mean = n_ph / n_bin if common_variance is not None: # Note: the variances we summed were means, not sums. # Hence M, not M*n_bin common_variance /= n_ave # Transform a sum into the average unnorm_cross = unnorm_cross / n_ave cross = cross / n_ave # Final normalization (If not done already!) if use_common_mean: factor = normalize_periodograms( 1, dt, n_bin, common_mean, n_ph=n_ph, norm=norm, variance=common_variance ) cross = unnorm_cross * factor if return_subcs: for i in range(len(subcs)): subcs[i] *= factor results = Table() results["freq"] = freq results["power"] = cross results["unnorm_power"] = unnorm_cross results.meta.update( { "n": n_bin, "m": n_ave, "dt": dt, "norm": norm, "df": 1 / (dt * n_bin), "nphots": n_ph, "mean": common_mean, "variance": common_variance, "segment_size": dt * n_bin, } ) if return_subcs: results.meta["subcs"] = subcs results.meta["unnorm_subcs"] = unnorm_subcs return results def avg_cs_from_iterables_quick(flux_iterable1, flux_iterable2, dt, norm="frac"): """Like `avg_cs_from_iterables`, with default options that make it quick. Assumes that: * the flux iterables return counts/bin, no other units * the mean is calculated over the whole light curve, and normalization is done at the end * no auxiliary PDSs are returned * only positive frequencies are returned * the spectrum is complex, no real parts or absolutes * no progress bars Parameters ---------- flux_iterable1 : `iterable` of `np.array`s or of tuples (`np.array`, `np.array`) Iterable providing either equal-length series of count measurements, or of tuples (fluxes, errors). They must all be of the same length. flux_iterable2 : `iterable` of `np.array`s or of tuples (`np.array`, `np.array`) Same as ``flux_iterable1``, for the reference channel dt : float Time resolution of the light curves used to produce periodograms Other Parameters ---------------- norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram Returns ------- results : :class:`astropy.table.Table` Table containing the following columns: freq : `np.array` The periodogram frequencies power : `np.array` The normalized periodogram powers unnorm_power : `np.array` The unnormalized periodogram powers And a number of other useful diagnostics in the metadata, including all attributes needed to allocate Crossspectrum objects, such as all the input arguments of this function (``dt``, ``segment_size``), and, e.g. n : int the number of bins in the light curves used in each segment m : int the number of averaged periodograms mean : float the mean flux (geometrical average of the mean fluxes in the two channels) """ # Initialize stuff unnorm_cross = None n_ave = 0 sum_of_photons1 = sum_of_photons2 = 0 for flux1, flux2 in zip(flux_iterable1, flux_iterable2): if flux1 is None or flux2 is None or np.all(flux1 == 0) or np.all(flux2 == 0): continue n_bin = flux1.size # Calculate the sum of each light curve, to calculate the mean n_ph1 = flux1.sum() n_ph2 = flux2.sum() # At the first loop, we define the frequency array and the range of # positive frequency bins (after the first loop, cross will not be # None anymore) if unnorm_cross is None: freq = fftfreq(n_bin, dt) fgt0 = positive_fft_bins(n_bin) # Calculate the FFTs ft1 = fft(flux1) ft2 = fft(flux2) # Calculate the unnormalized cross spectrum unnorm_power = ft1.conj() * ft2 # Accumulate the sum to calculate the total mean of the lc sum_of_photons1 += n_ph1 sum_of_photons2 += n_ph2 # Take only positive frequencies unnorm_power = unnorm_power[fgt0] # Initialize or accumulate final averaged spectrum unnorm_cross = sum_if_not_none_or_initialize(unnorm_cross, unnorm_power) n_ave += 1 # If no valid intervals were found, return only `None`s if unnorm_cross is None: return None # Calculate the mean number of photons per chunk n_ph1 = sum_of_photons1 / n_ave n_ph2 = sum_of_photons2 / n_ave n_ph = np.sqrt(n_ph1 * n_ph2) # Calculate the mean number of photons per bin common_mean1 = n_ph1 / n_bin common_mean2 = n_ph2 / n_bin common_mean = n_ph / n_bin # Transform the sums into averages unnorm_cross /= n_ave # Finally, normalize the cross spectrum (only if not already done on an # interval-to-interval basis) cross = normalize_periodograms( unnorm_cross, dt, n_bin, common_mean, n_ph=n_ph, norm=norm, variance=None, power_type="all", ) # No negative frequencies freq = freq[fgt0] results = Table() results["freq"] = freq results["power"] = cross results["unnorm_power"] = unnorm_cross results.meta.update( { "n": n_bin, "m": n_ave, "dt": dt, "norm": norm, "df": 1 / (dt * n_bin), "nphots": n_ph, "nphots1": n_ph1, "nphots2": n_ph2, "variance": None, "mean": common_mean, "mean1": common_mean1, "mean2": common_mean2, "power_type": "all", "fullspec": False, "segment_size": dt * n_bin, } ) return results def avg_cs_from_iterables( flux_iterable1, flux_iterable2, dt, norm="frac", use_common_mean=True, silent=False, fullspec=False, power_type="all", return_auxil=False, return_subcs=False, ): """Calculate the average cross spectrum from an iterable of light curves Parameters ---------- flux_iterable1 : `iterable` of `np.array`s or of tuples (`np.array`, `np.array`) Iterable providing either equal-length series of count measurements, or of tuples (fluxes, errors). They must all be of the same length. flux_iterable2 : `iterable` of `np.array`s or of tuples (`np.array`, `np.array`) Same as ``flux_iterable1``, for the reference channel dt : float Time resolution of the light curves used to produce periodograms Other Parameters ---------------- norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part return_auxil : bool, default False Return the auxiliary unnormalized PDSs from the two separate channels return_subcs : bool, default False Return all sub spectra from each light curve chunk Returns ------- results : :class:`astropy.table.Table` Table containing the following columns: freq : `np.array` The frequencies. power : `np.array` The normalized cross spectral powers. unnorm_power : `np.array` The unnormalized cross spectral power. unnorm_pds1 : `np.array` The unnormalized auxiliary PDS from channel 1. Only returned if ``return_auxil`` is ``True``. unnorm_pds2 : `np.array` The unnormalized auxiliary PDS from channel 2. Only returned if ``return_auxil`` is ``True``. And a number of other useful diagnostics in the metadata, including all attributes needed to allocate Crossspectrum objects, such as all the input arguments of this function (``dt``, ``segment_size``), and, e.g. n : int The number of bins in the light curves used in each segment. m : int The number of averaged periodograms. mean : float The mean flux (geometrical average of the mean fluxes in the two channels). """ local_show_progress = show_progress if silent: def local_show_progress(a): return a # Initialize stuff cross = unnorm_cross = unnorm_pds1 = unnorm_pds2 = pds1 = pds2 = None n_ave = 0 sum_of_photons1 = sum_of_photons2 = 0 common_variance1 = common_variance2 = common_variance = None if return_subcs: subcs = [] unnorm_subcs = [] for flux1, flux2 in local_show_progress(zip(flux_iterable1, flux_iterable2)): if flux1 is None or flux2 is None or np.all(flux1 == 0) or np.all(flux2 == 0): continue # Does the flux iterable return the uncertainty? # If so, define the variances variance1 = variance2 = None if isinstance(flux1, tuple): flux1, err1 = flux1 variance1 = np.mean(err1) ** 2 if isinstance(flux2, tuple): flux2, err2 = flux2 variance2 = np.mean(err2) ** 2 # Only use the variance if both flux iterables define it. if variance1 is None or variance2 is None: variance1 = variance2 = None else: common_variance1 = sum_if_not_none_or_initialize(common_variance1, variance1) common_variance2 = sum_if_not_none_or_initialize(common_variance2, variance2) n_bin = flux1.size # At the first loop, we define the frequency array and the range of # positive frequency bins (after the first loop, cross will not be # None anymore) if cross is None: freq = fftfreq(n_bin, dt) fgt0 = positive_fft_bins(n_bin) # Calculate the FFTs ft1 = fft(flux1) ft2 = fft(flux2) # Calculate the sum of each light curve chunk, to calculate the mean n_ph1 = flux1.sum() n_ph2 = flux2.sum() n_ph = np.sqrt(n_ph1 * n_ph2) # Calculate the unnormalized cross spectrum unnorm_power = ft1.conj() * ft2 unnorm_pd1 = unnorm_pd2 = 0 # If requested, calculate the auxiliary PDSs if return_auxil: unnorm_pd1 = (ft1 * ft1.conj()).real unnorm_pd2 = (ft2 * ft2.conj()).real # Accumulate the sum to calculate the total mean of the lc sum_of_photons1 += n_ph1 sum_of_photons2 += n_ph2 # Take only positive frequencies unless the user wants the full # spectrum if not fullspec: unnorm_power = unnorm_power[fgt0] if return_auxil: unnorm_pd1 = unnorm_pd1[fgt0] unnorm_pd2 = unnorm_pd2[fgt0] cs_seg = copy.deepcopy(unnorm_power) p1_seg = unnorm_pd1 p2_seg = unnorm_pd2 # If normalization has to be done interval by interval, do it here. if not use_common_mean: mean1 = n_ph1 / n_bin mean2 = n_ph2 / n_bin mean = n_ph / n_bin variance = None if variance1 is not None: variance = np.sqrt(variance1 * variance2) cs_seg = normalize_periodograms( unnorm_power, dt, n_bin, mean, n_ph=n_ph, norm=norm, power_type=power_type, variance=variance, ) if return_auxil: p1_seg = normalize_periodograms( unnorm_pd1, dt, n_bin, mean1, n_ph=n_ph1, norm=norm, power_type=power_type, variance=variance1, ) p2_seg = normalize_periodograms( unnorm_pd2, dt, n_bin, mean2, n_ph=n_ph2, norm=norm, power_type=power_type, variance=variance2, ) if return_subcs: subcs.append(cs_seg) unnorm_subcs.append(unnorm_power) # Initialize or accumulate final averaged spectra cross = sum_if_not_none_or_initialize(cross, cs_seg) unnorm_cross = sum_if_not_none_or_initialize(unnorm_cross, unnorm_power) if return_auxil: unnorm_pds1 = sum_if_not_none_or_initialize(unnorm_pds1, unnorm_pd1) unnorm_pds2 = sum_if_not_none_or_initialize(unnorm_pds2, unnorm_pd2) pds1 = sum_if_not_none_or_initialize(pds1, p1_seg) pds2 = sum_if_not_none_or_initialize(pds2, p2_seg) n_ave += 1 # If no valid intervals were found, return only `None`s if cross is None: return None # Calculate the mean number of photons per chunk n_ph1 = sum_of_photons1 / n_ave n_ph2 = sum_of_photons2 / n_ave n_ph = np.sqrt(n_ph1 * n_ph2) # Calculate the common mean number of photons per bin common_mean1 = n_ph1 / n_bin common_mean2 = n_ph2 / n_bin common_mean = n_ph / n_bin if common_variance1 is not None: # Note: the variances we summed were means, not sums. Hence M, not M*N common_variance1 /= n_ave common_variance2 /= n_ave common_variance = np.sqrt(common_variance1 * common_variance2) # Transform the sums into averages cross /= n_ave unnorm_cross /= n_ave if return_auxil: unnorm_pds1 /= n_ave unnorm_pds2 /= n_ave # Finally, normalize the cross spectrum (only if not already done on an # interval-to-interval basis) if use_common_mean: cross = normalize_periodograms( unnorm_cross, dt, n_bin, common_mean, n_ph=n_ph, norm=norm, variance=common_variance, power_type=power_type, ) if return_subcs: for i in range(len(subcs)): subcs[i] = normalize_periodograms( unnorm_subcs[i], dt, n_bin, common_mean, n_ph=n_ph, norm=norm, variance=common_variance, power_type=power_type, ) if return_auxil: pds1 = normalize_periodograms( unnorm_pds1, dt, n_bin, common_mean1, n_ph=n_ph1, norm=norm, variance=common_variance1, power_type=power_type, ) pds2 = normalize_periodograms( unnorm_pds2, dt, n_bin, common_mean2, n_ph=n_ph2, norm=norm, variance=common_variance2, power_type=power_type, ) # If the user does not want negative frequencies, don't give them if not fullspec: freq = freq[fgt0] results = Table() results["freq"] = freq results["power"] = cross results["unnorm_power"] = unnorm_cross results.meta.update( { "n": n_bin, "m": n_ave, "dt": dt, "norm": norm, "df": 1 / (dt * n_bin), "segment_size": dt * n_bin, "nphots": n_ph, "nphots1": n_ph1, "nphots2": n_ph2, "countrate1": common_mean1 / dt, "countrate2": common_mean2 / dt, "mean": common_mean, "mean1": common_mean1, "mean2": common_mean2, "power_type": power_type, "fullspec": fullspec, "variance": common_variance, "variance1": common_variance1, "variance2": common_variance2, } ) if return_auxil: results["pds1"] = pds1 results["pds2"] = pds2 results["unnorm_pds1"] = unnorm_pds1 results["unnorm_pds2"] = unnorm_pds2 if return_subcs: results.meta["subcs"] = np.array(subcs) results.meta["unnorm_subcs"] = np.array(unnorm_subcs) return results def avg_pds_from_events(*args, **kwargs): warnings.warn( "avg_pds_from_events is deprecated, use avg_cs_from_timeseries instead", DeprecationWarning ) return avg_pds_from_timeseries(*args, **kwargs) def avg_pds_from_timeseries( times, gti, segment_size, dt, norm="frac", use_common_mean=True, silent=False, fluxes=None, errors=None, return_subcs=False, ): """ Calculate the average periodogram from a list of event times or a light curve. If the input is a light curve, the time array needs to be uniformly sampled inside GTIs (it can have gaps outside), and the fluxes need to be passed through the ``fluxes`` array. Otherwise, times are interpreted as photon arrival times. Parameters ---------- times : float `np.array` Array of times. gti : [[gti00, gti01], [gti10, gti11], ...] Good time intervals. segment_size : float Length of segments. If ``None``, the full light curve is used. dt : float Time resolution of the light curves used to produce periodograms. Other Parameters ---------------- norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. silent : bool, default False Silence the progress bars fluxes : float `np.array`, default None Array of counts per bin or fluxes errors : float `np.array`, default None Array of errors on the fluxes above return_subcs : bool, default False Return all sub spectra from each light curve chunk Returns ------- freq : `np.array` The periodogram frequencies power : `np.array` The normalized periodogram powers n_bin : int the number of bins in the light curves used in each segment n_ave : int or float the number of averaged periodograms mean : float the mean flux """ binned = fluxes is not None if segment_size is not None: segment_size, n_bin = fix_segment_size_to_integer_samples(segment_size, dt) elif binned and segment_size is None: n_bin = fluxes.size else: _, n_bin = fix_segment_size_to_integer_samples(gti.max() - gti.min(), dt) flux_iterable = get_flux_iterable_from_segments( times, gti, segment_size, n_bin, dt=dt, fluxes=fluxes, errors=errors ) cross = avg_pds_from_iterable( flux_iterable, dt, norm=norm, use_common_mean=use_common_mean, silent=silent, return_subcs=return_subcs, ) if cross is not None: cross.meta["gti"] = gti return cross def avg_cs_from_events(*args, **kwargs): warnings.warn( "avg_cs_from_events is deprecated, use avg_cs_from_timeseries instead", DeprecationWarning ) return avg_cs_from_timeseries(*args, **kwargs) def avg_cs_from_timeseries( times1, times2, gti, segment_size, dt, norm="frac", use_common_mean=True, fullspec=False, silent=False, power_type="all", fluxes1=None, fluxes2=None, errors1=None, errors2=None, return_auxil=False, return_subcs=False, ): """ Calculate the average cross spectrum from a list of event times or a light curve. If the input is a light curve, the time arrays need to be uniformly sampled inside GTIs (they can have gaps outside), and the fluxes need to be passed through the ``fluxes1`` and ``fluxes2`` arrays. Otherwise, times are interpreted as photon arrival times Parameters ---------- times1 : float `np.array` Array of times in the sub-band times2 : float `np.array` Array of times in the reference band gti : [[gti00, gti01], [gti10, gti11], ...] common good time intervals segment_size : float length of segments. If ``None``, the full light curve is used. dt : float Time resolution of the light curves used to produce periodograms Other Parameters ---------------- norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part fluxes1 : float `np.array`, default None Array of fluxes or counts per bin for channel 1 fluxes2 : float `np.array`, default None Array of fluxes or counts per bin for channel 2 errors1 : float `np.array`, default None Array of errors on the fluxes on channel 1 errors2 : float `np.array`, default None Array of errors on the fluxes on channel 2 return_subcs : bool, default False Return all sub spectra from each light curve chunk Returns ------- freq : `np.array` The periodogram frequencies pds : `np.array` The normalized periodogram powers n_bin : int the number of bins in the light curves used in each segment n_ave : int or float the number of averaged periodograms """ binned = fluxes1 is not None and fluxes2 is not None if segment_size is not None: segment_size, n_bin = fix_segment_size_to_integer_samples(segment_size, dt) elif binned and segment_size is None: n_bin = fluxes1.size else: _, n_bin = fix_segment_size_to_integer_samples(gti.max() - gti.min(), dt) flux_iterable1 = get_flux_iterable_from_segments( times1, gti, segment_size, n_bin=n_bin, dt=dt, fluxes=fluxes1, errors=errors1 ) flux_iterable2 = get_flux_iterable_from_segments( times2, gti, segment_size, n_bin=n_bin, dt=dt, fluxes=fluxes2, errors=errors2 ) is_events = np.all([val is None for val in (fluxes1, fluxes2, errors1, errors2)]) if ( is_events and silent and use_common_mean and power_type == "all" and not fullspec and not return_auxil and not return_subcs ): results = avg_cs_from_iterables_quick(flux_iterable1, flux_iterable2, dt, norm=norm) else: results = avg_cs_from_iterables( flux_iterable1, flux_iterable2, dt, norm=norm, use_common_mean=use_common_mean, silent=silent, fullspec=fullspec, power_type=power_type, return_auxil=return_auxil, return_subcs=return_subcs, ) if results is not None: results.meta["gti"] = gti return results def lsft_fast( y: npt.ArrayLike, t: npt.ArrayLike, freqs: npt.ArrayLike, sign: Optional[int] = 1, oversampling: Optional[int] = 5, ) -> npt.NDArray: """ Calculates the Lomb-Scargle Fourier transform of a light curve. Only considers non-negative frequencies. Subtracts mean from data as it is required for the working of the algorithm. Adapted from original Matlab code by J. D. Scargle, using Astropy's ``trig_sum`` for speed. Parameters ---------- y : a :class:`numpy.array` of floats Observations to be transformed. t : :class:`numpy.array` of floats Times of the observations freqs : :class:`numpy.array` An array of frequencies at which the transform is sampled. sign : int, optional, default: 1 The sign of the fourier transform. 1 implies positive sign and -1 implies negative sign. fullspec : bool, optional, default: False Return LSFT values for full frequency array (True) or just positive frequencies (False). oversampling : float, optional, default: 5 Interpolation Oversampling Factor Returns ------- ft_res : numpy.ndarray An array of Fourier transformed data. """ y_ = copy.deepcopy(y) - np.mean(y) freqs = freqs[freqs >= 0] # Constants initialization sum_xx = np.sum(y_) num_xt = len(y_) num_ww = len(freqs) # Arrays initialization ft_real = ft_imag = np.zeros((num_ww)) f0, df, N = freqs[0], freqs[1] - freqs[0], len(freqs) # Sum (y_i * cos(wt - wtau)) Sh, Ch = trig_sum(t, y_, df, N, f0, oversampling=oversampling) # Angular Frequency w = freqs * 2 * np.pi # Summation of cos(2wt) and sin(2wt) csum, ssum = trig_sum( t, np.ones_like(y_) / len(y_), df, N, f0, freq_factor=2, oversampling=oversampling ) wtau = 0.5 * np.arctan2(ssum, csum) S2, C2 = trig_sum( t, np.ones_like(y_), df, N, f0, freq_factor=2, oversampling=oversampling, ) const = np.sqrt(0.5) * np.sqrt(num_ww) coswtau = np.cos(wtau) sinwtau = np.sin(wtau) sumr = Ch * coswtau + Sh * sinwtau sumi = Sh * coswtau - Ch * sinwtau cos2wtau = np.cos(2 * wtau) sin2wtau = np.sin(2 * wtau) scos2 = 0.5 * (N + C2 * cos2wtau + S2 * sin2wtau) ssin2 = 0.5 * (N - C2 * cos2wtau - S2 * sin2wtau) ft_real = const * sumr / np.sqrt(scos2) ft_imag = const * np.sign(sign) * sumi / np.sqrt(ssin2) ft_real[freqs == 0] = sum_xx / np.sqrt(num_xt) ft_imag[freqs == 0] = 0 phase = wtau - w * t[0] ft_res = np.complex128(ft_real + (1j * ft_imag)) * np.exp(-1j * phase) return ft_res def lsft_slow( y: npt.ArrayLike, t: npt.ArrayLike, freqs: npt.ArrayLike, sign: Optional[int] = 1, ) -> npt.NDArray: """ Calculates the Lomb-Scargle Fourier transform of a light curve. Only considers non-negative frequencies. Subtracts mean from data as it is required for the working of the algorithm. Adapted from original Matlab code by J. D. Scargle. Parameters ---------- y : a `:class:numpy.array` of floats Observations to be transformed. t : `:class:numpy.array` of floats Times of the observations freqs : numpy.ndarray An array of frequencies at which the transform is sampled. sign : int, optional, default: 1 The sign of the fourier transform. 1 implies positive sign and -1 implies negative sign. fullspec : bool, optional, default: False Return LSFT values for full frequency array (True) or just positive frequencies (False). Returns ------- ft_res : numpy.ndarray An array of Fourier transformed data. """ y_ = y - np.mean(y) freqs = np.asanyarray(freqs[np.asanyarray(freqs) >= 0]) ft_real = np.zeros_like(freqs) ft_imag = np.zeros_like(freqs) ft_res = np.zeros_like(freqs, dtype=np.complex128) num_y = y_.shape[0] num_freqs = freqs.shape[0] sum_y = np.sum(y_) const1 = np.sqrt(0.5 * num_y) const2 = const1 * np.sign(sign) ft_real = ft_imag = np.zeros(num_freqs) ft_res = np.zeros(num_freqs, dtype=np.complex128) for i in range(num_freqs): wrun = freqs[i] * 2 * np.pi if wrun == 0: ft_real = sum_y / np.sqrt(num_y) ft_imag = 0 phase_this = 0 else: # Calculation of \omega \tau (II.5) -- csum = np.sum(np.cos(2.0 * wrun * t)) ssum = np.sum(np.sin(2.0 * wrun * t)) watan = np.arctan2(ssum, csum) wtau = 0.5 * watan # -- # In the following, instead of t'_n we are using \omega t'_n = \omega t - \omega\tau # Terms of kind X_n * cos or sin (II.1) -- sumr = np.sum(y_ * np.cos(wrun * t - wtau)) sumi = np.sum(y_ * np.sin(wrun * t - wtau)) # -- # A and B before the square root and inversion in (II.3) -- scos2 = np.sum(np.power(np.cos(wrun * t - wtau), 2)) ssin2 = np.sum(np.power(np.sin(wrun * t - wtau), 2)) # const2 is const1 times the sign. # It's the F0 in II.2 without the phase factor # The sign decides whether we are calculating the direct or inverse transform ft_real = const1 * sumr / np.sqrt(scos2) ft_imag = const2 * sumi / np.sqrt(ssin2) phase_this = wtau - wrun * t[0] ft_res[i] = np.complex128(ft_real + (1j * ft_imag)) * np.exp(-1j * phase_this) return ft_res def impose_symmetry_lsft( ft_res: npt.ArrayLike, sum_y: float, len_y: int, freqs: npt.ArrayLike, ) -> npt.ArrayLike: """ Impose symmetry on the input fourier transform. Parameters ---------- ft_res : np.array The Fourier transform of the signal. sum_y : float The sum of the values of the signal. len_y : int The length of the signal. freqs : np.array An array of frequencies at which the transform is sampled. Returns ------- lsft_res : np.array The Fourier transform of the signal with symmetry imposed. freqs_new : np.array The new frequencies """ ft_res_new = np.concatenate([np.conjugate(np.flip(ft_res)), [0.0], ft_res]) freqs_new = np.concatenate([np.flip(-freqs), [0.0], freqs]) return ft_res_new, freqs_new