Source code for phazap.phazap

import numpy as np
import os

from . import gwphase
from . import utils
from . import gw_utils as gwutils
from . import tension_utils as tension
from .postprocess_phase import postprocess_phase, PostprocessedPhase, _variables_event_1, _variables_event_2

[docs] def phases_events(event1_postprocessed_phase, event2_postprocessed_phase): """ Compute the detector, time-delay and :math:`\Delta\phi_f` phases of the two events. The procedure is described in Sec. 2 of https://arxiv.org/pdf/2308.06616.pdf Parameters ---------- event1_postprocessed_phase : PostprocessedPhase The first event event2_postprocessed_phase : PostprocessedPhase The second event Returns ------- parameters_1 : np.ndarray The parameters (det_phase, tau_phase, Dphi_f) of the first event parameters_2 : np.ndarray The parameters (det_phase, tau_phase, Dphi_f) of the second event det_phases_1 : np.ndarray The detector phases of the first event det_phases_2 : np.ndarray The detector phases of the second event tau_phases_1 : np.ndarray The time delay phases of the first event tau_phases_2 : np.ndarray The time delay phases of the second event Dphi_f_1 : np.ndarray The :math:`\Delta\phi_f` of the first event Dphi_f_2 : np.ndarray The :math:`\Delta\phi_f` of the second event above_below : np.ndarray The dot product of :math:`\\vec{n}` with :math:`\\vec{r}_{d_1 d_2} \\times \\vec{r}_{d_1 d_3}` the second event where its sign indicates whether the source is above or below the plane of the detectors """ assert event1_postprocessed_phase.fbest == event2_postprocessed_phase.fbest, "The two sets of phases have different fbest" fbest = event1_postprocessed_phase.fbest # Either one should be fine assert event1_postprocessed_phase.flow == event2_postprocessed_phase.flow, "The two sets of phases have different flow" flow = event1_postprocessed_phase.flow assert event1_postprocessed_phase.fhigh == event2_postprocessed_phase.fhigh, "The two sets of phases have different fhigh" fhigh = event1_postprocessed_phase.fhigh # Define variables *explicitly* so that linter will be satisfied # Read from memory geocent_time_1 = event1_postprocessed_phase.dataset['geocent_time'] phase_H_1 = event1_postprocessed_phase.dataset['phase_H'] phase_L_1 = event1_postprocessed_phase.dataset['phase_L'] phase_V_1 = event1_postprocessed_phase.dataset['phase_V'] tau_HL_1 = event1_postprocessed_phase.dataset['tau_HL'] tau_HV_1 = event1_postprocessed_phase.dataset['tau_HV'] Dphi_f_1 = event1_postprocessed_phase.dataset['Dphi_f'] geocent_time_2 = event2_postprocessed_phase.dataset['geocent_time'] ra_2 = event2_postprocessed_phase.dataset['ra'] dec_2 = event2_postprocessed_phase.dataset['dec'] psi_2 = event2_postprocessed_phase.dataset['psi'] phase_fbest_2 = event2_postprocessed_phase.dataset['phase_fbest'] a22_fbest_2 = event2_postprocessed_phase.dataset['a22_fbest'] zeta_fbest_2 = event2_postprocessed_phase.dataset['zeta_fbest'] Dphi_f_2 = event2_postprocessed_phase.dataset['Dphi_f'] Nsamples_2 = len(phase_fbest_2) tc_shift_21 = np.mean(geocent_time_2) - np.mean(geocent_time_1) above_below = np.zeros(Nsamples_2) tau_H_2_wrt_1 = np.zeros(Nsamples_2) tau_L_2_wrt_1 = np.zeros(Nsamples_2) tau_V_2_wrt_1 = np.zeros(Nsamples_2) for l in range(Nsamples_2): tau_H_2_wrt_1[l] = 2.*np.pi*fbest*gwutils.time_delay_det("H1", ra_2[l], dec_2[l], geocent_time_2[l]-tc_shift_21) tau_L_2_wrt_1[l] = 2.*np.pi*fbest*gwutils.time_delay_det("L1", ra_2[l], dec_2[l], geocent_time_2[l]-tc_shift_21) tau_V_2_wrt_1[l] = 2.*np.pi*fbest*gwutils.time_delay_det("V1", ra_2[l], dec_2[l], geocent_time_2[l]-tc_shift_21) #Define quadrant of the source. #See Appendix C of https://arxiv.org/pdf/2308.06616.pdf above_below[l] = gwutils.N_dot_cross_d123(ra_2[l],dec_2[l],geocent_time_2[l],gwutils.H1_vertex_meters,gwutils.L1_vertex_meters,gwutils.V1_vertex_meters) #Arrival time phase difference between detectors tau_HL_2 = tau_H_2_wrt_1-tau_L_2_wrt_1 tau_HV_2 = tau_H_2_wrt_1-tau_V_2_wrt_1 tau_LV_2 = tau_L_2_wrt_1-tau_V_2_wrt_1 #Detector phases at reference frame of event 1 Fp_H_2, Fx_H_2 = gwutils.FpFx("H1",ra_2, dec_2, psi_2, geocent_time_2 - tc_shift_21) Fp_L_2, Fx_L_2 = gwutils.FpFx("L1",ra_2, dec_2, psi_2, geocent_time_2 - tc_shift_21) Fp_V_2, Fx_V_2 = gwutils.FpFx("V1",ra_2, dec_2, psi_2, geocent_time_2 - tc_shift_21) phase_H_2 = gwphase.phase_d(phase_fbest_2,a22_fbest_2,zeta_fbest_2,Fp_H_2,Fx_H_2) phase_L_2 = gwphase.phase_d(phase_fbest_2,a22_fbest_2,zeta_fbest_2,Fp_L_2,Fx_L_2) phase_V_2 = gwphase.phase_d(phase_fbest_2,a22_fbest_2,zeta_fbest_2,Fp_V_2,Fx_V_2) #Sets of parameters #All phases parameters_1 = np.stack( ( phase_H_1, phase_L_1, phase_V_1, tau_HL_1, tau_HV_1, Dphi_f_1 ), axis=1 ) parameters_2 = np.stack( ( phase_H_2, phase_L_2, phase_V_2, tau_HL_2, tau_HV_2, Dphi_f_2 ), axis=1 ) #Detector phases det_phases_1 = np.stack( ( phase_H_1, phase_L_1, phase_V_1 ), axis=1 ) det_phases_2 = np.stack( ( phase_H_2, phase_L_2, phase_V_2 ), axis=1 ) #Time delay phases tau_phases_1 = np.stack( ( tau_HL_1, tau_HV_1 ), axis=1 ) tau_phases_2 = np.stack( ( tau_HL_2, tau_HV_2 ), axis=1 ) return parameters_1, parameters_2, det_phases_1, det_phases_2, tau_phases_1, tau_phases_2, Dphi_f_1, Dphi_f_2, above_below
[docs] def phazap_all_metrics_one_ordering(event1_postprocessed_phase, event2_postprocessed_phase): """ Compute all metrics for one ordering of the two events The method is described in Sec. 4 of https://arxiv.org/pdf/2308.06616.pdf Parameters ---------- event1_postprocessed_phase : PostprocessedPhase The first event event2_postprocessed_phase : PostprocessedPhase The second event Returns ------- neff : float The number of effective phases vol_1 : float The volume of the posterior distribution of the first event, considering all phases vol_2 : float The volume of the posterior distribution of the second event, considering all phases vol_phases_1 : float The volume of the posterior distribution of the first event, only considering the detector phases vol_phases_2 : float The volume of the posterior distribution of the second event, only considering the detector phases dist_all : float The distance of the posterior distribution between the two events, considering all phases dist_phases : float The distance of the posterior distribution between the two events, only considering the detector phases dist_Tphases : float The distance of the posterior distribution between the two events, only considering the time delay phases dist_Dphi : float The distance of the posterior distribution between the two events, only considering :math:`\Delta\phi_f` """ parameters_1, parameters_2, det_phases_1, det_phases_2, tau_phases_1, tau_phases_2, Dphi_f_1, Dphi_f_2, above_below = phases_events(event1_postprocessed_phase, event2_postprocessed_phase) nphases = 3 #Record number of effective phases prior_range = np.pi/2 neff = tension.n_eff_pair(det_phases_1,det_phases_2,nphases,prior_range) #Compute volumes vol_1 = tension.volume_phases(parameters_1,nphases) vol_2 = tension.volume_phases(parameters_2,nphases) vol_phases_1, vol_phases_2 = tension.volume_only_phases(det_phases_1,det_phases_2) #Compute distances #We divide the samples in two groups, above and below the plane of the detectors #only if the fraction of samples in each group is between 0.05 and 0.95 frac_samples_below = len(above_below[above_below<0]) / len(above_below) frac_limit = 0.05 if (frac_samples_below>frac_limit) & (frac_samples_below < 1 - frac_limit): #All phases dist_M = tension.distance_phases_with_shift(parameters_1,parameters_2[above_below<0],nphases) dist_P = tension.distance_phases_with_shift(parameters_1,parameters_2[above_below>0],nphases) dist_all = np.minimum(dist_M,dist_P) #Time delay phases dist_Tphases_M = tension.distance(tau_phases_1,tau_phases_2[above_below<0]) dist_Tphases_P = tension.distance(tau_phases_1,tau_phases_2[above_below>0]) dist_Tphases = np.minimum(dist_Tphases_M,dist_Tphases_P) #Detector phases dist_phases_M = tension.distance_only_phases_with_shift(det_phases_1,det_phases_2[above_below<0]) dist_phases_P = tension.distance_only_phases_with_shift(det_phases_1,det_phases_2[above_below>0]) dist_phases = np.minimum(dist_phases_M,dist_phases_P) else: dist_all = tension.distance_phases_with_shift(parameters_1,parameters_2,nphases) dist_phases = tension.distance_only_phases_with_shift(det_phases_1,det_phases_2) dist_Tphases = tension.distance(tau_phases_1,tau_phases_2) dist_Dphi = tension.distance1D(Dphi_f_1,Dphi_f_2) return neff, vol_1, vol_2, vol_phases_1, vol_phases_2, dist_all, dist_phases, dist_Tphases, dist_Dphi
[docs] def phazap_one_ordering(event1_postprocessed_phase, event2_postprocessed_phase): """ Compute the volume, distance and number of effective phases for one ordering of the two events. The method is described in Sec. 4 of https://arxiv.org/pdf/2308.06616.pdf Parameters ---------- event1_postprocessed_phase : PostprocessedPhase The first event event2_postprocessed_phase : PostprocessedPhase The second event Returns ------- vol_phases_1 : float The volume of the posterior distribution of the first event, only considering the detector phases dist_all : float The distance of the posterior distribution between the two events, considering all phases neff : float The number of effective phases """ parameters_1, parameters_2, det_phases_1, det_phases_2, tau_phases_1, tau_phases_2, Dphi_f_1, Dphi_f_2, above_below = phases_events(event1_postprocessed_phase, event2_postprocessed_phase) nphases = 3 #Compute volume phases vol_phases_1, vol_phases_2 = tension.volume_only_phases(det_phases_1,det_phases_2) #Compute distances #We divide the samples in two groups, above and below the plane #only if the fraction of samples in each group is between 0.05 and 0.95 frac_samples_below = len(above_below[above_below<0]) / len(above_below) frac_limit = 0.05 if (frac_samples_below>frac_limit) & (frac_samples_below < 1 - frac_limit): #All phases dist_M = tension.distance_phases_with_shift(parameters_1,parameters_2[above_below<0],nphases) dist_P = tension.distance_phases_with_shift(parameters_1,parameters_2[above_below>0],nphases) dist_all = np.minimum(dist_M,dist_P) else: dist_all = tension.distance_phases_with_shift(parameters_1,parameters_2,nphases) #Compute neff prior_range = np.pi/2 neff = tension.n_eff_pair(det_phases_1,det_phases_2,nphases,prior_range) return vol_phases_1, dist_all, neff
def _phazap(event1_postprocessed_phase, event2_postprocessed_phase): """ Compute the :math:`D_J` statistic, the :math:`V_J` statistic, the phase shift, and the :math:`p`-value Parameters ---------- event1_postprocessed_phase : PostprocessedPhase The first event event2_postprocessed_phase : PostprocessedPhase The second event Returns ------- D_J : float The :math:`D_J` statistic vol_J : float The :math:`V_J` statistic phase_shift : float The phase shift D_J_n : np.ndarray The :math:`D_J` statistic for each allowed phase shift p_value : float The :math:`p`-value """ #Compute volumes and distances for both orderings vol_phases_12, dist_12, neff_12 = phazap_one_ordering(event1_postprocessed_phase, event2_postprocessed_phase) vol_phases_21, dist_21, neff_21 = phazap_one_ordering(event2_postprocessed_phase, event1_postprocessed_phase) """ dist_12 is [0, +pi/2, +pi, -pi/2] this should corresponds to dist_21 of [0, -pi/2, +pi, +pi/2] """ dist_21_swap = np.array([dist_21[0],dist_21[3],dist_21[2],dist_21[1]]) D_J_n = np.maximum(dist_12,dist_21_swap) D_J = np.min(D_J_n) vol_J = vol_phases_12 + vol_phases_21 #Phase shift phase_shifts = np.array([0,1,2,-1])*np.pi/2 phase_shift = phase_shifts[np.argmin(D_J_n)] #Compute p-value neff = neff_12 if D_J in dist_12 else neff_21 p_value = utils.p_sigma(D_J, neff+3) return D_J, vol_J, phase_shift, D_J_n, p_value
[docs] def phazap(event_1, event_2, plot=False, output_dir="./", output_filename=None): """ Compute the :math:`D_J` statistic, the :math:`V_J` statistic, the phase shift, and the :math:`p`-value Parameters ---------- event_1 : str or PostprocessedPhase The first event. If it is a string, it should be a file path to either a postprocessed phase file or a PE result file event_2 : str or PostprocessedPhase The second event. If it is a string, it should be a file path to either a postprocessed phase file or a PE result file plot : bool, optional Whether to plot the results, by default False output_dir : str, optional The output directory, by default "./" output_filename : str, optional The output filename, by default None Returns ------- D_J : float The :math:`D_J` statistic vol_J : float The :math:`V_J` statistic phase_shift : float The phase shift D_J_n : np.ndarray The :math:`D_J` statistic for each allowed phase shift p_value : float The :math:`p`-value """ def check_if_postprocessed(x): if type(x) is PostprocessedPhase: # x is indeed a PostprocessedPhase object, return it directly return x if type(x) is str: if os.path.exists(x): # x is a file path try: # Maybe it is already postprocessed? return PostprocessedPhase.from_file(x) except: try: # Maybe it is a summary file? return postprocess_phase(x) except: raise ValueError(f"Does not understand {x}") else: raise NotImplementedError("Currently only support a file path as the value") postprocessed_phase_1 = check_if_postprocessed(event_1) postprocessed_phase_2 = check_if_postprocessed(event_2) if plot: from .plot_utils import phazap_plot phazap_plot(postprocessed_phase_1, postprocessed_phase_2, output_dir=output_dir, output_filename=output_filename) return _phazap(postprocessed_phase_1, postprocessed_phase_2)
[docs] def phazap_summary(event_1, event_2, plot=False, output_dir="./", output_filename=None): """ Compute the :math:`D_J` statistic, the :math:`V_J` statistic, the phase shift, and the :math:`p`-value Parameters ---------- event_1 : str or PostprocessedPhase The first event. If it is a string, it should be a file path to either a postprocessed phase file or a PE result file event_2 : str or PostprocessedPhase The second event. If it is a string, it should be a file path to either a postprocessed phase file or a PE result file plot : bool, optional Whether to plot the results, by default False output_dir : str, optional The output directory, by default "./" output_filename : str, optional The output filename, by default None Returns ------- D_J : float The :math:`D_J` statistic vol_J : float The :math:`V_J` statistic phase_shift : float The phase shift p_value : float The :math:`p`-value """ D_J, vol_J, phase_shift, D_J_n, p_value = phazap(event_1, event_2, plot=plot, output_dir=output_dir, output_filename=output_filename) return D_J, vol_J, phase_shift, p_value