"""
Definition of :class:`EventList`.
:class:`EventList` is used to handle photon arrival times.
"""
import copy
import logging
import pickle
import warnings
from collections.abc import Iterable
import numpy as np
import numpy.random as ra
from astropy.table import Table
from .base import StingrayObject, StingrayTimeseries
from .filters import get_deadtime_mask
from .gti import append_gtis, check_separate, cross_gtis, generate_indices_of_boundaries
from .io import load_events_and_gtis
from .lightcurve import Lightcurve
from .utils import assign_value_if_none, simon, interpret_times, njit
__all__ = ["EventList"]
@njit
def _from_lc_numba(times, counts, empty_times):
last = 0
for t, c in zip(times, counts):
val = c + last
empty_times[last:val] = t
last = val
return empty_times
def simple_events_from_lc(lc):
"""
Create an :class:`EventList` from a :class:`stingray.Lightcurve` object. Note that all
events in a given time bin will have the same time stamp.
Parameters
----------
lc: :class:`stingray.Lightcurve` object
Light curve to use for creation of the event list.
Returns
-------
ev: :class:`EventList` object
The resulting list of photon arrival times generated from the light curve.
Examples
--------
>>> from stingray import Lightcurve
>>> lc = Lightcurve([0, 1], [2, 3], dt=1)
>>> ev = simple_events_from_lc(lc)
>>> np.allclose(ev.time, [0, 0, 1, 1, 1])
True
"""
times = _from_lc_numba(
lc.time, lc.counts.astype(int), np.zeros(np.sum(lc.counts).astype(int), dtype=float)
)
return EventList(time=times, gti=lc.gti)
[docs]
class EventList(StingrayTimeseries):
"""
Basic class for event list data. Event lists generally correspond to individual events (e.g. photons)
recorded by the detector, and their associated properties. For X-ray data where this type commonly occurs,
events are time stamps of when a photon arrived in the detector, and (optionally) the photon energy associated
with the event.
Parameters
----------
time: iterable
A list or array of time stamps
Other Parameters
----------------
dt: float
The time resolution of the events. Only relevant when using events
to produce light curves with similar bin time.
energy: iterable
A list of array of photon energy values in keV
mjdref : float
The MJD used as a reference for the time array.
ncounts: int
Number of desired data points in event list.
gtis: ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Good Time Intervals
pi : integer, numpy.ndarray
PI channels
notes : str
Any useful annotations
high_precision : bool
Change the precision of self.time to float128. Useful while dealing with fast pulsars.
mission : str
Mission that recorded the data (e.g. NICER)
instr : str
Instrument onboard the mission
header : str
The full header of the original FITS file, if relevant
detector_id : iterable
The detector that recorded each photon (if the instrument has more than
one, e.g. XMM/EPIC-pn)
timeref : str
The time reference, as recorded in the FITS file (e.g. SOLARSYSTEM)
timesys : str
The time system, as recorded in the FITS file (e.g. TDB)
ephem : str
The JPL ephemeris used to barycenter the data, if any (e.g. DE430)
**other_kw :
Used internally. Any other keyword arguments will be ignored
Attributes
----------
time: numpy.ndarray
The array of event arrival times, in seconds from the reference
MJD defined in ``mjdref``
energy: numpy.ndarray
The array of photon energy values
ncounts: int
The number of data points in the event list
dt: float
The time resolution of the events. Only relevant when using events
to produce light curves with similar bin time.
mjdref : float
The MJD used as a reference for the time array.
gtis: ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]``
Good Time Intervals
pi : integer, numpy.ndarray
PI channels
high_precision : bool
Change the precision of self.time to float128. Useful while dealing with fast pulsars.
mission : str
Mission that recorded the data (e.g. NICER)
instr : str
Instrument onboard the mission
detector_id : iterable
The detector that recoded each photon, if relevant (e.g. XMM, Chandra)
header : str
The full header of the original FITS file, if relevant
"""
main_array_attr = "time"
def __init__(
self,
time=None,
energy=None,
ncounts=None,
mjdref=0,
dt=0,
notes="",
gti=None,
pi=None,
high_precision=False,
mission=None,
instr=None,
header=None,
detector_id=None,
ephem=None,
timeref=None,
timesys=None,
**other_kw,
):
StingrayObject.__init__(self)
self.energy = None if energy is None else np.asarray(energy)
self.notes = notes
self.dt = dt
self.mjdref = mjdref
self.gti = np.asarray(gti) if gti is not None else None
self.pi = None if pi is None else np.asarray(pi)
self.ncounts = ncounts
self.mission = mission
self.instr = instr
self.detector_id = detector_id
self.header = header
self.ephem = ephem
self.timeref = timeref
self.timesys = timesys
if other_kw != {}:
warnings.warn(f"Unrecognized keywords: {list(other_kw.keys())}")
if time is not None:
time, mjdref = interpret_times(time, mjdref)
if not high_precision:
self.time = np.asarray(time)
else:
self.time = np.asarray(time, dtype=np.longdouble)
self.ncounts = self.time.size
else:
self.time = None
if (self.time is not None) and (self.energy is not None):
if self.time.size != self.energy.size:
raise ValueError("Lengths of time and energy must be equal.")
[docs]
def to_lc(self, dt, tstart=None, tseg=None):
"""
Convert event list to a :class:`stingray.Lightcurve` object.
Parameters
----------
dt: float
Binning time of the light curve
Other Parameters
----------------
tstart : float
Start time of the light curve
tseg: float
Total duration of light curve
Returns
-------
lc: :class:`stingray.Lightcurve` object
"""
if tstart is None and self.gti is not None:
tstart = self.gti[0][0]
tseg = self.gti[-1][1] - tstart
return Lightcurve.make_lightcurve(
self.time, dt, tstart=tstart, gti=self.gti, tseg=tseg, mjdref=self.mjdref
)
[docs]
def to_lc_iter(self, dt, segment_size=None):
"""Convert event list to a generator of Lightcurves.
Parameters
----------
dt: float
Binning time of the light curves
Other parameters
----------------
segment_size : float, default None
Optional segment size. If None, use the GTI boundaries
Returns
-------
lc_gen: `generator`
Generates one :class:`stingray.Lightcurve` object for each GTI or segment
"""
segment_iter = generate_indices_of_boundaries(
self.time, self.gti, segment_size=segment_size, dt=0
)
for st, end, idx_st, idx_end in segment_iter:
tseg = end - st
lc = Lightcurve.make_lightcurve(
self.time[idx_st : idx_end + 1],
dt,
tstart=st,
gti=np.asarray([[st, end]]),
tseg=tseg,
mjdref=self.mjdref,
use_hist=True,
)
yield lc
[docs]
def to_lc_list(self, dt, segment_size=None):
"""Convert event list to a list of Lightcurves.
Parameters
----------
dt: float
Binning time of the light curves
Other parameters
----------------
segment_size : float, default None
Optional segment size. If None, use the GTI boundaries
Returns
-------
lc_list: `List`
List containig one :class:`stingray.Lightcurve` object for each GTI or segment
"""
return list(self.to_lc_iter(dt, segment_size))
[docs]
@staticmethod
def from_lc(lc):
"""
Create an :class:`EventList` from a :class:`stingray.Lightcurve` object. Note that all
events in a given time bin will have the same time stamp.
Parameters
----------
lc: :class:`stingray.Lightcurve` object
Light curve to use for creation of the event list.
Returns
-------
ev: :class:`EventList` object
The resulting list of photon arrival times generated from the light curve.
"""
return simple_events_from_lc(lc)
[docs]
def simulate_times(self, lc, use_spline=False, bin_time=None):
"""Simulate times from an input light curve.
Randomly simulate photon arrival times to an :class:`EventList` from a
:class:`stingray.Lightcurve` object, using the inverse CDF method.
..note::
Preferably use model light curves containing **no Poisson noise**,
as this method will intrinsically add Poisson noise to them.
Parameters
----------
lc: :class:`stingray.Lightcurve` object
Other Parameters
----------------
use_spline : bool
Approximate the light curve with a spline to avoid binning effects
bin_time : float default None
Ignored and deprecated, maintained for backwards compatibility.
Returns
-------
times : array-like
Simulated photon arrival times
"""
# Need import here, or there will be a circular import
from .simulator.base import simulate_times
if bin_time is not None:
warnings.warn("Bin time will be ignored in simulate_times", DeprecationWarning)
self.time = simulate_times(lc, use_spline=use_spline)
self.gti = lc.gti
self.ncounts = len(self.time)
[docs]
def simulate_energies(self, spectrum, use_spline=False):
"""
Assign (simulate) energies to event list from a spectrum.
Parameters
----------
spectrum: 2-d array or list [energies, spectrum]
Energies versus corresponding fluxes. The 2-d array or list must
have energies across the first dimension and fluxes across the
second one. If the dimension of the energies is the same as
spectrum, they are interpreted as bin centers.
If it is longer by one, they are interpreted as proper bin edges
(similarly to the bins of `np.histogram`).
Note that for non-uniformly binned spectra, it is advisable to pass
the exact edges.
"""
from .simulator.base import simulate_with_inverse_cdf
if self.ncounts is None:
simon("Either set time values or explicity provide counts.")
return
if isinstance(spectrum, list) or isinstance(spectrum, np.ndarray):
energy = np.asarray(spectrum)[0]
fluxes = np.asarray(spectrum)[1]
if not isinstance(energy, np.ndarray):
raise IndexError("Spectrum must be a 2-d array or list")
else:
raise TypeError("Spectrum must be a 2-d array or list")
if energy.size == fluxes.size:
de = energy[1] - energy[0]
energy = np.concatenate([energy - de / 2, [energy[-1] + de / 2]])
self.energy = simulate_with_inverse_cdf(
fluxes, self.ncounts, edges=energy, sorted=False, interp_kind="linear"
)
[docs]
def sort(self, inplace=False):
"""Sort the event list in time.
Other parameters
----------------
inplace : bool, default False
Sort in place. If False, return a new event list.
Returns
-------
eventlist : `EventList`
The sorted event list. If ``inplace=True``, it will be a shallow copy
of ``self``.
Examples
--------
>>> events = EventList(time=[0, 2, 1], energy=[0.3, 2, 0.5], pi=[3, 20, 5])
>>> e1 = events.sort()
>>> np.allclose(e1.time, [0, 1, 2])
True
>>> np.allclose(e1.energy, [0.3, 0.5, 2])
True
>>> np.allclose(e1.pi, [3, 5, 20])
True
But the original event list has not been altered (``inplace=False`` by
default):
>>> np.allclose(events.time, [0, 2, 1])
True
Let's do it in place instead
>>> e2 = events.sort(inplace=True)
>>> np.allclose(e2.time, [0, 1, 2])
True
In this case, the original event list has been altered.
>>> np.allclose(events.time, [0, 1, 2])
True
"""
order = np.argsort(self.time)
return self.apply_mask(order, inplace=inplace)
[docs]
def join(self, other):
"""
Join two :class:`EventList` objects into one.
If both are empty, an empty :class:`EventList` is returned.
GTIs are crossed if the event lists are over a common time interval,
and appended otherwise.
Standard attributes such as ``pi`` and ``energy`` remain ``None`` if they are ``None``
in both. Otherwise, ``np.nan`` is used as a default value for the :class:`EventList` where
they were None. Arbitrary attributes (e.g., Stokes parameters in polarimetric data) are
created and joined using the same convention.
Multiple checks are done on the joined event lists. If the time array of the event list
being joined is empty, it is ignored. If the time resolution is different, the final
event list will have the rougher time resolution. If the MJDREF is different, the time
reference will be changed to the one of the first event list. An empty event list will
be ignored.
Parameters
----------
other : :class:`EventList` object or class:`list` of :class:`EventList` objects
The other :class:`EventList` object which is supposed to be joined with.
If ``other`` is a list, it is assumed to be a list of :class:`EventList` objects
and they are all joined, one by one.
Returns
-------
`ev_new` : :class:`EventList` object
The resulting :class:`EventList` object.
"""
ev_new = EventList()
if isinstance(other, EventList):
others = [other]
else:
others = other
# First of all, check if there are empty event lists
for obj in others:
if getattr(obj, "time", None) is None or np.size(obj.time) == 0:
warnings.warn("One of the event lists you are joining is empty.")
others.remove(obj)
if len(others) == 0:
return copy.deepcopy(self)
for i, other in enumerate(others):
# Tolerance for MJDREF:1 microsecond
if not np.isclose(self.mjdref, other.mjdref, atol=1e-6 / 86400):
warnings.warn("Attribute mjdref is different in the event lists being merged.")
others[i] = other.change_mjdref(self.mjdref)
all_objs = [self] + others
dts = list(set([getattr(obj, "dt", None) for obj in all_objs]))
if len(dts) != 1:
warnings.warn("The time resolution is different. Using the rougher by default")
ev_new.dt = np.max(dts)
all_time_arrays = [obj.time for obj in all_objs if obj.time is not None]
ev_new.time = np.concatenate(all_time_arrays)
order = np.argsort(ev_new.time)
ev_new.time = ev_new.time[order]
def _get_set_from_many_lists(lists):
"""Make a single set out of many lists."""
all_vals = []
for l in lists:
all_vals += l
return set(all_vals)
def _get_all_array_attrs(objs):
"""Get all array attributes from the event lists being merged. Do not include time."""
all_attrs = []
for obj in objs:
if obj.time is not None and len(obj.time) > 0:
all_attrs += obj.array_attrs()
all_attrs = list(set(all_attrs))
if "time" in all_attrs:
all_attrs.remove("time")
return all_attrs
for attr in _get_all_array_attrs(all_objs):
# if it's here, it means that it's an array attr in at least one object.
# So, everywhere it's None, it needs to be set to 0s of the same length as time
new_attr_values = []
for obj in all_objs:
if getattr(obj, attr, None) is None:
warnings.warn(
f"The {attr} array is empty in one of the event lists being merged. Setting it to NaN for the affected events"
)
new_attr_values.append(np.zeros_like(obj.time) + np.nan)
else:
new_attr_values.append(getattr(obj, attr))
new_attr = np.concatenate(new_attr_values)[order]
setattr(ev_new, attr, new_attr)
if np.all([obj.gti is None for obj in all_objs]):
ev_new.gti = None
else:
all_gti_lists = []
for obj in all_objs:
if obj.gti is None and len(obj.time) > 0:
obj.gti = assign_value_if_none(
obj.gti,
np.asarray([[obj.time[0] - obj.dt / 2, obj.time[-1] + obj.dt / 2]]),
)
if obj.gti is not None:
all_gti_lists.append(obj.gti)
new_gtis = all_gti_lists[0]
for gti in all_gti_lists[1:]:
if check_separate(new_gtis, gti):
new_gtis = append_gtis(new_gtis, gti)
warnings.warn(
"GTIs in these two event lists do not overlap at all."
"Merging instead of returning an overlap."
)
else:
new_gtis = cross_gtis([new_gtis, gti])
ev_new.gti = new_gtis
all_meta_attrs = _get_set_from_many_lists([obj.meta_attrs() for obj in all_objs])
# The attributes being treated separately are removed from the standard treatment
# When energy, pi etc. are None, they might appear in the meta_attrs, so we
# also add them to the list of attributes to be removed if present.
to_remove = ["gti", "header", "ncounts", "dt"] + ev_new.array_attrs()
for attrs in to_remove:
if attrs in all_meta_attrs:
all_meta_attrs.remove(attrs)
logging.info("The header attribute will be removed from the output event list.")
def _safe_concatenate(a, b):
if isinstance(a, str) and isinstance(b, str):
return a + "," + b
else:
if isinstance(a, tuple):
return a + (b,)
return (a, b)
for attr in all_meta_attrs:
self_attr = getattr(self, attr, None)
new_val = self_attr
for other in others:
other_attr = getattr(other, attr, None)
if self_attr != other_attr:
warnings.warn(
"Attribute " + attr + " is different in the event lists being merged."
)
new_val = _safe_concatenate(new_val, other_attr)
setattr(ev_new, attr, new_val)
ev_new.mjdref = self.mjdref
return ev_new
[docs]
@classmethod
def read(cls, filename, fmt=None, **kwargs):
r"""Read a :class:`EventList` object from file.
Currently supported formats are
* pickle (not recommended for long-term storage)
* hea or ogip : FITS Event files from (well, some) HEASARC-supported missions.
* any other formats compatible with the writers in
:class:`astropy.table.Table` (ascii.ecsv, hdf5, etc.)
Files that need the :class:`astropy.table.Table` interface MUST contain
at least a ``time`` column. Other recognized columns are ``energy`` and
``pi``.
The default ascii format is enhanced CSV (ECSV). Data formats
supporting the serialization of metadata (such as ECSV and HDF5) can
contain all eventlist attributes such as ``mission``, ``gti``, etc with
no significant loss of information. Other file formats might lose part
of the metadata, so must be used with care.
Parameters
----------
filename: str
Path and file name for the file to be read.
fmt: str
Available options are 'pickle', 'hea', and any `Table`-supported
format such as 'hdf5', 'ascii.ecsv', etc.
Other parameters
----------------
kwargs : dict
Any further keyword arguments to be passed to `load_events_and_gtis`
for reading in event lists in OGIP/HEASOFT format
Returns
-------
ev: :class:`EventList` object
The :class:`EventList` object reconstructed from file
"""
if fmt is not None and fmt.lower() in ("hea", "ogip"):
evtdata = load_events_and_gtis(filename, **kwargs)
evt = EventList(
time=evtdata.ev_list,
gti=evtdata.gti_list,
pi=evtdata.pi_list,
energy=evtdata.energy_list,
mjdref=evtdata.mjdref,
instr=evtdata.instr,
mission=evtdata.mission,
header=evtdata.header,
detector_id=evtdata.detector_id,
ephem=evtdata.ephem,
timeref=evtdata.timeref,
timesys=evtdata.timesys,
)
if "additional_columns" in kwargs:
for key in evtdata.additional_data:
if not hasattr(evt, key.lower()):
setattr(evt, key.lower(), evtdata.additional_data[key])
return evt
return super().read(filename=filename, fmt=fmt)
[docs]
def filter_energy_range(self, energy_range, inplace=False, use_pi=False):
"""Filter the event list from a given energy range.
Parameters
----------
energy_range: [float, float]
Energy range in keV, or in PI channel (if ``use_pi`` is True)
Other Parameters
----------------
inplace : bool, default False
Do the change in place (modify current event list). Otherwise, copy
to a new event list.
use_pi : bool, default False
Use PI channel instead of energy in keV
Examples
--------
>>> events = EventList(time=[0, 1, 2], energy=[0.3, 0.5, 2], pi=[3, 5, 20])
>>> e1 = events.filter_energy_range([0, 1])
>>> np.allclose(e1.time, [0, 1])
True
>>> np.allclose(events.time, [0, 1, 2])
True
>>> e2 = events.filter_energy_range([0, 10], use_pi=True, inplace=True)
>>> np.allclose(e2.time, [0, 1])
True
>>> np.allclose(events.time, [0, 1])
True
"""
if use_pi:
energies = self.pi
else:
energies = self.energy
mask = (energies >= energy_range[0]) & (energies < energy_range[1])
return self.apply_mask(mask, inplace=inplace)
[docs]
def apply_mask(self, mask, inplace=False):
"""Apply a mask to all array attributes of the event list
Parameters
----------
mask : array of ``bool``
The mask. Has to be of the same length as ``self.time``
Other parameters
----------------
inplace : bool
If True, overwrite the current event list. Otherwise, return a new one.
Examples
--------
>>> evt = EventList(time=[0, 1, 2], mission="nustar")
>>> evt.bubuattr = [222, 111, 333]
>>> newev0 = evt.apply_mask([True, True, False], inplace=False);
>>> newev1 = evt.apply_mask([True, True, False], inplace=True);
>>> newev0.mission == "nustar"
True
>>> np.allclose(newev0.time, [0, 1])
True
>>> np.allclose(newev0.bubuattr, [222, 111])
True
>>> np.allclose(newev1.time, [0, 1])
True
>>> evt is newev1
True
"""
array_attrs = self.array_attrs()
if inplace:
new_ev = self
else:
new_ev = EventList()
for attr in self.meta_attrs():
setattr(new_ev, attr, copy.deepcopy(getattr(self, attr)))
for attr in array_attrs:
if hasattr(self, attr) and getattr(self, attr) is not None:
setattr(new_ev, attr, copy.deepcopy(np.asarray(getattr(self, attr))[mask]))
return new_ev
[docs]
def apply_deadtime(self, deadtime, inplace=False, **kwargs):
"""Apply deadtime filter to this event list.
Additional arguments in ``kwargs`` are passed to `get_deadtime_mask`
Parameters
----------
deadtime : float
Value of dead time to apply to data
inplace : bool, default False
If True, apply the deadtime to the current event list. Otherwise,
return a new event list.
Returns
-------
new_event_list : `EventList` object
Filtered event list. if `inplace` is True, this is the input object
filtered for deadtime, otherwise this is a new object.
additional_output : object
Only returned if `return_all` is True. See `get_deadtime_mask` for
more details.
Examples
--------
>>> events = np.array([1, 1.05, 1.07, 1.08, 1.1, 2, 2.2, 3, 3.1, 3.2])
>>> events = EventList(events)
>>> events.pi=np.array([1, 2, 2, 2, 2, 1, 1, 1, 2, 1])
>>> events.energy=np.array([1, 2, 2, 2, 2, 1, 1, 1, 2, 1])
>>> events.mjdref = 10
>>> filt_events, retval = events.apply_deadtime(0.11, inplace=False,
... verbose=False,
... return_all=True)
>>> filt_events is events
False
>>> expected = np.array([1, 2, 2.2, 3, 3.2])
>>> np.allclose(filt_events.time, expected)
True
>>> np.allclose(filt_events.pi, 1)
True
>>> np.allclose(filt_events.energy, 1)
True
>>> np.allclose(events.pi, 1)
False
>>> filt_events = events.apply_deadtime(0.11, inplace=True,
... verbose=False)
>>> filt_events is events
True
"""
local_retall = kwargs.pop("return_all", False)
mask, retall = get_deadtime_mask(self.time, deadtime, return_all=True, **kwargs)
new_ev = self.apply_mask(mask, inplace=inplace)
if local_retall:
new_ev = [new_ev, retall]
return new_ev