import os
import numpy as np
import h5py
import bilby
from tqdm import tqdm
from . import logger, _default_postprocessed_phase_dir, _default_postprocessed_phase_filename_str
from . import gwphase
from . import gw_utils as gwutils
from .pe_input import ParameterEstimationInput
#Post-processed phases
_variables_event_1 = [
'phase_H',
'phase_L',
'phase_V',
'Dphi_f',
'tau_HL',
'tau_HV',
'tau_LV'
]
#For event 2 we need to compute the phases in the frame of event 1
_variables_event_2 = [
'phase_fbest',
'a22_fbest',
'zeta_fbest',
'r_fbest',
'phase_fhigh',
'phase_flow',
'tau_H',
'tau_L',
'tau_V',
'ra',
'dec',
'psi',
'geocent_time'
]
[docs]
class PostprocessedPhase:
[docs]
def __init__(
self,
dataset,
flow,
fhigh,
fbest,
superevent_name=None,
label=None,
):
"""
A class to store the postprocessed phase
Parameters
----------
dataset: dict
A dictionary containing the postprocessed phase data
flow: float
Lower frequency cutoff for computing :math:`\Delta \phi_f`
fhigh: float
Upper frequency cutoff for computing :math:`\Delta \phi_f`
fbest: float
Frequency at which the phase is best measured
superevent_name: str
Name of the superevent
label: str
Label for the postprocessed phase
Returns
-------
PostprocessedPhase
An instance of PostprocessedPhase class
"""
# Make a list of variables that should exist in dataset
variables = set([*_variables_event_1, *_variables_event_2])
missing_keys = [p for p in dataset.keys() if p not in variables]
assert missing_keys == [], f"Key(s) {missing_keys} is/are missing"
self.dataset = dataset
self.flow = flow
self.fhigh = fhigh
self.fbest = fbest
self.superevent_name = superevent_name
self.label = label
[docs]
@classmethod
def from_file(cls, hdf5_file):
"""
Load the postprocessed phase from a hdf5 file
Parameters
----------
hdf5_file: str
Path to the hdf5 file
Returns
-------
PostprocessedPhase
An instance of PostprocessedPhase class
"""
with h5py.File(hdf5_file, "r") as f:
dataset = {p: np.array(f[p]) for p in [*_variables_event_1, *_variables_event_2]}
flow = f.attrs['flow']
fhigh = f.attrs['fhigh']
fbest = f.attrs['fbest']
superevent_name = None
label = None
if "superevent_name" in f.attrs.keys():
superevent_name = f.attrs['superevent_name']
if "label" in f.attrs.keys():
label = f.attrs['label']
return cls(
dataset,
flow,
fhigh,
fbest,
superevent_name=superevent_name,
label=label
)
[docs]
def postprocess_phase(
pe_result,
flow=20.,
fhigh=100.,
fbest=40.,
superevent_name=None,
label=None,
output_dir=_default_postprocessed_phase_dir,
output_filename=None,
):
"""
Postprocess the phase of the GW signal.
The procedure is described in Sec. II of https://arxiv.org/pdf/2308.06616.pdf
Parameters
----------
pe_result: str or ParameterEstimationInput
Path to the bilby result/PESummary file or an instance of ParameterEstimationInput
flow: float
Lower frequency cutoff for computing :math:`\Delta \phi_f`
fhigh: float
Upper frequency cutoff for computing :math:`\Delta \phi_f`
fbest: float
Frequency at which the phase is best measured
superevent_name: str
Name of the superevent
label: str
Label for the postprocessed phase
output_dir: str
Path to the output directory
output_filename: str
Name of the output file
Returns
-------
PostprocessedPhase
An instance of PostprocessedPhase class
"""
if type(pe_result) is ParameterEstimationInput:
ip = pe_result # No need to do anything
elif type(pe_result) is str:
try:
# Maybe it is a bilby result file?
ip = ParameterEstimationInput.from_bilby_result_file(pe_result)
except:
try:
# Maybe it is a PESummary file?
ip = ParameterEstimationInput.from_PESummary_file(pe_result)
except:
raise ValueError(f"Does not understand {pe_result}")
posterior_samples = ip.posterior_samples
fref = ip.reference_frequency
duration = ip.duration
sampling_frequency = ip.sampling_frequency
ifos = ip.ifo_list
logger.info(f"Detectors online {ifos}")
approx = ip.waveform_approximant
logger.info(f"Waveform approximant {approx}")
N_posteriors = len(posterior_samples['mass_1'])
modes = [[2,2],[2,-2]]
waveform_arguments = dict(
waveform_approximant=approx,
reference_frequency=fref,
minimum_frequency=min(fref,flow) - 2./duration,
maximum_frequency=fhigh + 2./duration,
mode_array = modes
)
waveform_generator = bilby.gw.WaveformGenerator(
duration=duration,
sampling_frequency=sampling_frequency,
frequency_domain_source_model=bilby.gw.source.lal_binary_black_hole,
parameter_conversion=bilby.gw.conversion.convert_to_lal_binary_black_hole_parameters,
waveform_arguments=waveform_arguments,
)
ra = posterior_samples['ra']
dec = posterior_samples['dec']
psi = posterior_samples['psi']
geocent_time = posterior_samples['geocent_time']
phase_fbest = np.zeros(N_posteriors)
a22_fbest = np.zeros(N_posteriors)
zeta_fbest = np.zeros(N_posteriors)
r_fbest = np.zeros(N_posteriors)
phase_fhigh = np.zeros(N_posteriors)
phase_flow = np.zeros(N_posteriors)
fs = bilby.core.utils.series.create_frequency_series(sampling_frequency,duration)
rang_fs = (fs >= flow) & (fs <= fhigh)
i_best = np.argmin(abs(fs[rang_fs]-fbest))
for i in tqdm(range(N_posteriors)):
phase_fbest[i], a22_fbest[i], zeta_fbest[i], r_fbest[i], phase_fhigh[i], phase_flow[i] = gwphase.phases_fs(
waveform_generator,
detec = "H1",
rang_fs = rang_fs,
i_best = i_best,
mass_1=posterior_samples['mass_1'][i],
mass_2=posterior_samples['mass_2'][i],
a_1=posterior_samples['a_1'][i],
a_2=posterior_samples['a_2'][i],
tilt_1=posterior_samples['tilt_1'][i],
tilt_2=posterior_samples['tilt_2'][i],
phi_12=posterior_samples['phi_12'][i],
phi_jl=posterior_samples['phi_jl'][i],
luminosity_distance=posterior_samples['luminosity_distance'][i],
theta_jn=posterior_samples['theta_jn'][i],
psi=psi[i],
phase_ref=posterior_samples['phase'][i],
geocent_time=geocent_time[i],
ra=ra[i],
dec=dec[i],
duration = duration,
sampling_frequency = sampling_frequency
)
time_delay_H = np.zeros(N_posteriors)
time_delay_L = np.zeros(N_posteriors)
time_delay_V = np.zeros(N_posteriors)
for i in range(N_posteriors):
time_delay_H[i] = gwutils.time_delay_det("H1", ra[i], dec[i], geocent_time[i])
time_delay_L[i] = gwutils.time_delay_det("L1", ra[i], dec[i], geocent_time[i])
time_delay_V[i] = gwutils.time_delay_det("V1", ra[i], dec[i], geocent_time[i])
#Arrival time phase at each detector
tau_H = 2.*np.pi*fbest*(geocent_time + time_delay_H)
tau_L = 2.*np.pi*fbest*(geocent_time + time_delay_L)
tau_V = 2.*np.pi*fbest*(geocent_time + time_delay_V)
#Arrival time phase difference between detectors.
#See Eq. 4 of https://arxiv.org/pdf/2308.06616.pdf
tau_HL = tau_H-tau_L
tau_HV = tau_H-tau_V
tau_LV = tau_L-tau_V
#Antenna patterns at each detector
Fp_H, Fx_H = gwutils.FpFx("H1", ra, dec, psi, geocent_time)
Fp_L, Fx_L = gwutils.FpFx("L1", ra, dec, psi, geocent_time)
Fp_V, Fx_V = gwutils.FpFx("V1", ra, dec, psi, geocent_time)
#Detector phases.
#See Eq. 2 of https://arxiv.org/pdf/2308.06616.pdf
phase_H = gwphase.phase_d(phase_fbest,a22_fbest,zeta_fbest,Fp_H,Fx_H)
phase_L = gwphase.phase_d(phase_fbest,a22_fbest,zeta_fbest,Fp_L,Fx_L)
phase_V = gwphase.phase_d(phase_fbest,a22_fbest,zeta_fbest,Fp_V,Fx_V)
#Detector phase evolution in frequency
#See paragraph below Eq. 2 of https://arxiv.org/pdf/2308.06616.pdf
Dphi_f = phase_fhigh - phase_flow
# Saving the data
variables = [*_variables_event_1, *_variables_event_2]
if label is None:
if superevent_name is not None:
label_for_filename = superevent_name
else:
if type(pe_result) is str:
# Try to figure out the proper label from filename
label_for_filename = os.path.basename(pe_result).split('.')[0].split('_')[0]
else:
# Can't do much at this point......
label_for_filename = "event"
else:
if superevent_name is not None:
label_for_filename = f"{superevent_name}_{label}"
logger.info(f"Assigning {label} as the label")
if output_filename is None:
output_filename = _default_postprocessed_phase_filename_str.format(
label_for_filename,
fbest,
fhigh,
flow
)
output_fullpath = os.path.join(output_dir, output_filename)
with h5py.File(output_fullpath, "w") as f:
for var in variables:
_ = f.create_dataset(var, data=eval(var))
f.attrs["fbest"] = fbest
f.attrs["fhigh"] = fhigh
f.attrs["flow"] = flow
if label is not None:
f.attrs["label"] = label
if superevent_name is not None:
f.attrs["superevent_name"] = superevent_name
logger.info(f"Postprocessing completed and saved to {output_fullpath}")
return PostprocessedPhase.from_file(output_fullpath)