Parameter Estimation of Black-Hole Binary Waveforms — BayesFlow (SBI)
End-to-end simulation-based inference for gravitational-wave signals. I simulate BBH
waveforms with PyCBC, add PSD-colored noise, whiten/decimate, and train a
BayesFlow model (Conv1D+GRU summary → CouplingFlow posterior) to infer
[m1, m2, χ1, χ2, D, ι] with calibrated uncertainty.
Because public, labeled GW data are limited, I generate realistic waveforms from first-principles simulators under reasonable astrophysical priors and detector noise models. Classic ML regressors only provide point estimates; BayesFlow SBI learns full posteriors—capturing uncertainty, multi-modality, and parameter correlations efficiently.
Code Map (project structure)
.
├── priors.py # Physics-informed parameter priors (m1, m2, spins, D, ι)
├── simulator.py # PyCBC waveform + PSD-colored noise + whitening
├── generate_shard.py # Decimate 4096→1024 Hz + write NPZ shards
├── merge_shards.py # Merge shards → split → scale θ → save datasets + scaler
├── train_gw_bayesflow_down4_pf2_float32.py
│ └─ Conv1D+GRU summary → CouplingFlow posterior + AdamW + warmup→cosine
├── gw_bayesflow_diagnostics_down4_pf2.py
│ └─ Posterior sampling + rank/ECDF calibration + z-score QQ/hist
├── plot_loss_trajectory.py # CSV/NPY → smooth loss curves (moving averages)
└── SBI_gw.ipynb # Colab: data → training → diagnostics end-to-end
Data Pipeline (simulate → preprocess → dataset)
I generate paired examples (x, θ) end-to-end: sample physics-informed priors for
θ = [m1, m2, χ1, χ2, D, ι], simulate BBH strain with PyCBC, inject PSD-colored
noise, then whiten (freq-domain), decimate 4096→1024 Hz, and serialize to
NPZ with fixed splits for reproducible experiments.
- Priors → PyCBC waveform → PSD-colored noise
- Whitening → anti-alias decimation → standardization
- NPZ dataset + metadata; train/val split; fixed RNG seeds
A) Prior sampler
Physics-informed priors (power-law masses, spins uniform, D uniform in volume, isotropic ι).
import numpy as np, pandas as pd
def sample_prior(n_samples, *, alpha=0.0,
m_min=5.0, m_max=80.0,
D_min=100.0, D_max=2000.0,
rng_seed=None):
rng = np.random.default_rng(rng_seed)
# Primary mass m1: uniform or power-law p(m1) ∝ m1^{-α}
if np.isclose(alpha, 0.0):
m1 = rng.uniform(m_min, m_max, n_samples)
else:
expo = 1.0 - alpha
u = rng.random(n_samples)
m1 = (u*(m_max**expo - m_min**expo) + m_min**expo) ** (1.0/expo)
# Secondary mass m2 ≤ m1
m2 = rng.uniform(m_min, m1)
# Spins, distance (uniform in volume), isotropic inclination
chi1 = rng.uniform(0.0, 0.99, n_samples)
chi2 = rng.uniform(0.0, 0.99, n_samples)
uV = rng.random(n_samples)
D = (D_min**3 + uV*(D_max**3 - D_min**3)) ** (1.0/3.0) # ∝ D^2
inc = np.arccos(rng.uniform(-1.0, 1.0, n_samples))
return pd.DataFrame({
"m1": m1.astype(np.float32), "m2": m2.astype(np.float32),
"chi1": chi1.astype(np.float32), "chi2": chi2.astype(np.float32),
"D": D.astype(np.float32), "inc": inc.astype(np.float32)
})
B) Simulator & whitening
IMRPhenomD waveform + aLIGO PSD noise; min-SNR control; circular shift; PSD whitening.
import numpy as np, pandas as pd
from pycbc.waveform import get_td_waveform
from pycbc.noise import noise_from_psd
from pycbc.psd import aLIGOZeroDetHighPower
from pycbc.types import TimeSeries, FrequencySeries
DELTA_T = np.float32(1/4096)
DURATION = np.float32(8.0)
F_LOWER = 40.0
N = int(DURATION/DELTA_T)
def _make_psd(nfft):
psd = aLIGOZeroDetHighPower(nfft, 1.0/float(DELTA_T), 20.0)
return psd.astype(np.float64)
def _whiten(x: TimeSeries, sigma=1.0) -> TimeSeries:
Xf = x.to_frequencyseries()
psd = _make_psd(len(Xf))
Wf = FrequencySeries(Xf.numpy() / np.sqrt(psd.numpy()/2.0), delta_f=Xf.delta_f)
y = Wf.to_timeseries()[:len(x)]
y = y / (float(y.std())/float(sigma))
y._delta_t = x.delta_t
return y.astype(np.float32)
_CAL = 1.0 # one-time calibration so whitened noise has σ≈1
def simulate_event(theta: pd.Series, seed=None, *, min_snr=8.0, snr_jitter=None) -> TimeSeries:
rng = np.random.default_rng(seed)
# 1) Clean signal (preserve distance amplitude)
hp, _ = get_td_waveform(
approximant="IMRPhenomD",
mass1=float(theta.m1), mass2=float(theta.m2),
spin1z=float(theta.chi1), spin2z=float(theta.chi2),
distance=float(theta.D), inclination=float(theta.inc),
delta_t=float(DELTA_T), f_lower=F_LOWER
)
h = hp.astype(np.float32)
h = (h[:N] if len(h) >= N else TimeSeries(np.pad(h.numpy(), (0, N-len(h))), delta_t=DELTA_T))
# 2) PSD-colored noise; enforce min SNR by scaling NOISE (not the signal)
noise = noise_from_psd(N, DELTA_T, _make_psd(N//2+1), seed=seed).astype(np.float32)
def _opt_snr(sig):
return float(np.sqrt((sig.numpy()**2).sum() / (noise.numpy()**2).sum()) * 10.0)
clean_snr = _opt_snr(h)
noise_scale = 1.0
if (min_snr is not None) and (clean_snr > 0) and (clean_snr < min_snr):
noise_scale = clean_snr / float(min_snr)
if snr_jitter is not None and clean_snr > 0:
lo, hi = snr_jitter
target = float(rng.uniform(lo, hi))
noise_scale = min(noise_scale, clean_snr / target)
noise *= np.float32(noise_scale)
# 3) Mix, random circular shift, whiten
x = h + noise
shift = int(rng.integers(0, N))
x = TimeSeries(np.roll(x.numpy(), shift).astype(np.float32), delta_t=DELTA_T)
return _whiten(x, _CAL)
C) Anti-alias decimation & sharding
Two-stage ×2 decimation 4096→1024 Hz via resample_poly; shard NPZ writer.
import numpy as np
from scipy.signal import resample_poly, windows
KAISER_BETA = 12.0
PAD_SAMPLES = 1024
TAPER_ALPHA = 0.02
FS_IN, FS_OUT, DURATION = 4096, 1024, 8.0
T = int(FS_OUT * DURATION) # 8192
def _taper_ends(x, alpha=TAPER_ALPHA):
if alpha <= 0.0: return x
return (x * windows.tukey(len(x), alpha=alpha, sym=True)).astype(np.float64, copy=False)
def _pad_edges(x, pad=PAD_SAMPLES):
return np.pad(x, (pad, pad), mode="constant") if pad > 0 else x
def _trim_edges(y, down, pad=PAD_SAMPLES):
s = (pad // down) if pad > 0 else 0
return y[s:-s] if s > 0 else y
def _decimate_x2(x):
y = _taper_ends(_pad_edges(x))
y = resample_poly(y, up=1, down=2, window=("kaiser", KAISER_BETA))
y = _trim_edges(y, down=2)
return y
def decimate_4096_to_1024(x4096: np.ndarray) -> np.ndarray:
y = _decimate_x2(x4096) # 4096 → 2048
y = _decimate_x2(y) # 2048 → 1024
return y.astype(np.float32)
# simulate_event(θ) → x4096
# y = decimate_4096_to_1024(x4096) → shape (8192,)
# save shards: parameters (N,6), waveforms (N,1,8192)
D) Merge → split → scale → save
Concatenate shards, fixed split, StandardScaler(θ), save datasets + fitted scaler.
from pathlib import Path
import numpy as np, pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
RAW_DIR, OUT_DIR = Path("datasets/raw"), Path("datasets")
PREFIX = "gw_bbh_down4"
def load_all():
xs, thetas = [], []
for p in sorted(RAW_DIR.glob(f"{PREFIX}_shard*.npz")):
data = np.load(p)
xs.append(data["waveforms"]) # (N, 1, 8192) float32
thetas.append(data["parameters"]) # (N, 6) float32
return np.concatenate(xs, 0), np.concatenate(thetas, 0)
def main():
X, θ = load_all()
X = X.astype(np.float32, copy=False)
θ = θ.astype(np.float32, copy=False)
X_train, X_val, θ_train, θ_val = train_test_split(X, θ, test_size=4000, random_state=42, shuffle=True)
scaler = StandardScaler().fit(θ_train)
θ_train = scaler.transform(θ_train).astype(np.float32)
θ_val = scaler.transform(θ_val).astype(np.float32)
OUT_DIR.mkdir(parents=True, exist_ok=True)
np.savez_compressed(OUT_DIR / f"{PREFIX}_train.npz", parameters=θ_train, waveforms=X_train)
np.savez_compressed(OUT_DIR / f"{PREFIX}_val.npz", parameters=θ_val, waveforms=X_val)
with open(OUT_DIR / f"{PREFIX}_param_scaler.pkl", "wb") as f:
pickle.dump(scaler, f)
if __name__ == "__main__":
main()
Training & Validation
Conv1D+GRU summary network encodes x; a CouplingFlow models p(θ|x). NLL + AdamW, warm-up → cosine LR, early stopping, gradient clipping. Validate with rank/ECDF calibration, coverage, and NRMSE.
1) Mean-pool ×2 + standardize (save scaler)
Downsample by mean-pool ×2; standardize with TRAIN stats; persist scaler artifact.
import json, numpy as np
POOL_FACTOR = 2
def mean_pool_1d(arr, k):
N, T, C = arr.shape
cut = (T // k) * k
if cut != T: arr = arr[:, :cut, :]
return arr.reshape(N, cut // k, k, C).mean(axis=2, dtype=np.float32)
x_train = mean_pool_1d(x_train, POOL_FACTOR)
x_val = mean_pool_1d(x_val, POOL_FACTOR)
def per_channel_standardise(x, mean=None, std=None):
if mean is None or std is None:
mean = x.mean(axis=(0,1), keepdims=True, dtype=np.float64)
std = x.std(axis=(0,1), keepdims=True, dtype=np.float64) + 1e-8
x = (x - mean) / std
return x.astype(np.float32), mean.squeeze(), std.squeeze()
x_train, wmean, wstd = per_channel_standardise(x_train)
x_val, _, _ = per_channel_standardise(x_val, wmean[None,None,...], wstd[None,None,...])
np.savez(SCALER_NPZ, mean=wmean.astype(np.float32), std=wstd.astype(np.float32))
2) Model → Conv1D + GRU → CouplingFlow
TimeSeriesNetwork + CouplingFlow (depth 4, affine, permute, actnorm).
from bayesflow.networks import TimeSeriesNetwork, CouplingFlow
summary_net = TimeSeriesNetwork(
summary_dim = 64,
filters = (48, 64, 96, 128),
kernel_sizes = (5, 5, 3, 3),
recurrent_dim = 128,
bidirectional = False,
dropout = 0.35,
)
invertible_net = CouplingFlow(
depth = 4,
transform = "affine",
permutation = "random",
use_actnorm = True,
)
3) AdamW + warm-up → cosine (clipnorm)
AdamW, weight decay, clipnorm=1.0; WarmUpCosine schedule (~5% warm-up).
import tensorflow as tf
from tensorflow import keras
EPOCHS, BATCH_SIZE = 80, 64
steps_per_epoch = max(1, len(x_train) // BATCH_SIZE)
total_steps = EPOCHS * steps_per_epoch
warmup_steps = int(0.05 * total_steps)
class WarmUpCosine(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, base_lr, warmup_steps, total_steps, final_lr_ratio=5e-3):
super().__init__()
self.base_lr = tf.convert_to_tensor(float(base_lr), dtype=tf.float32)
self.warmup_steps = tf.cast(int(warmup_steps), tf.float32)
self.total_steps = tf.cast(int(total_steps), tf.float32)
self.cosine_decay = tf.keras.optimizers.schedules.CosineDecay(
initial_learning_rate=float(base_lr),
decay_steps=int(total_steps) - int(warmup_steps),
alpha=float(final_lr_ratio),
)
def __call__(self, step):
step = tf.cast(step, tf.float32)
warm = self.base_lr * (step / tf.maximum(1.0, self.warmup_steps))
cos = self.cosine_decay(tf.cast(step - self.warmup_steps, tf.int64))
return tf.where(step < self.warmup_steps, warm, cos)
lr_schedule = WarmUpCosine(base_lr=3e-4, warmup_steps=warmup_steps, total_steps=total_steps)
optimizer = keras.optimizers.AdamW(learning_rate=lr_schedule, weight_decay=1e-4, clipnorm=1.0)
4) Workflow, callbacks, and fitting
BasicWorkflow + EarlyStopping, Checkpoint(best .keras), CSVLogger, TerminateOnNaN.
import bayesflow as bf, numpy as np
from bayesflow.adapters import Adapter
adapter = Adapter().rename("waveforms", "summary_variables").to_array()
workflow = bf.workflows.BasicWorkflow(
adapter = adapter,
summary_network = summary_net,
invertible_network = invertible_net,
optimizer = optimizer,
inference_variables = ["inference_variables"],
summary_variables = ["summary_variables"],
)
early = keras.callbacks.EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True)
ckpt = keras.callbacks.ModelCheckpoint(filepath=SAVE_PATH, monitor="val_loss", save_best_only=True, verbose=1)
csv = keras.callbacks.CSVLogger(HIST_CSV, append=False)
nan = keras.callbacks.TerminateOnNaN()
history = workflow.fit_offline(
data = {"waveforms": x_train, "inference_variables": theta_train},
validation_data = {"waveforms": x_val, "inference_variables": theta_val},
epochs = EPOCHS, batch_size = BATCH_SIZE,
callbacks = [early, ckpt, csv, nan], verbose = 1,
)
np.save(HIST_NPY, {k: np.array(v) for k, v in history.history.items()}, allow_pickle=True)
Results + How to Run & Skills
On held-out examples, posteriors contract around ground-truth as SNR rises. Calibration looks healthy: rank histograms ~flat, ECDF near the diagonal, and z-score QQ/histograms close to N(0,1). Loss curves are smooth.
A) Diagnostics pipeline
Restore model, apply preprocessing, sample posteriors, then render rank hist/ECDF and z-score QQ/hist plots.
# gw_bayesflow_diagnostics_down4_pf2.py — highlights
val = np.load(VAL_PATH)
theta_val = val["parameters"].astype(np.float32)
x_val = val["waveforms"].astype(np.float32)
# channels-last & mean-pool ×2
if x_val.ndim == 3 and x_val.shape[1] == 1: x_val = np.transpose(x_val, (0, 2, 1))
elif x_val.ndim == 2: x_val = x_val[:, :, None]
def mean_pool_1d(arr, k):
N, T, C = arr.shape; cut = (T // k) * k
if cut != T: arr = arr[:, :cut, :]
return arr.reshape(N, cut // k, k, C).mean(axis=2, dtype=np.float32)
x_val = mean_pool_1d(x_val, k=2)
# standardize with TRAIN scaler
sc = np.load(SCALER_NPZ)
mean, std = sc["mean"][None,None,:], sc["std"][None,None,:]
x_val = ((x_val - mean) / std).astype(np.float32)
# restore & sample
adapter = Adapter().rename("waveforms", "summary_variables").to_array()
workflow = BasicWorkflow(adapter=adapter, optimizer=None)
workflow.approximator = keras.saving.load_model(MODEL_PATH, compile=False)
idx = np.random.default_rng(42).choice(len(x_val), 300, replace=False)
samples = workflow.sample(num_samples=1_000, conditions={"waveforms": x_val[idx]})["inference_variables"]
# diagnostics
plots.calibration_histogram(samples, theta_val[idx])
figs = workflow.plot_default_diagnostics(
test_data={"waveforms": x_val[idx], "inference_variables": theta_val[idx]},
calibration_ecdf_kwargs={"difference": True, "figsize": (15,3)}
)
B) Loss trajectory (raw points + moving averages)
Read CSVLogger / .npy history and draw a clean loss trajectory with centered moving averages.
def moving_average(x, window):
x = np.asarray(x, dtype=float)
if window < 2: return x
w = int(window) | 1
mask = ~np.isnan(x)
num = np.convolve(np.where(mask, x, 0.0), np.ones(w), mode="same")
den = np.convolve(mask.astype(float), np.ones(w), mode="same")
return np.divide(num, np.maximum(den, 1e-12))
# load loss + val_loss; pick ~5% window
df = pd.read_csv(csv_path) # fallback to npy if needed
loss, val_loss = df["loss"].to_numpy(), df.get("val_loss", pd.Series(np.nan)).to_numpy()
win = max(5, int(round(0.05 * len(loss))) | 1)
plt.figure(figsize=(16,4))
plt.plot(loss, "o", ms=3, lw=1, alpha=.35, label="Training")
if val_loss.size: plt.plot(val_loss, "o--", ms=3, lw=1, alpha=.25, label="Validation")
plt.plot(moving_average(loss, win), lw=2.5, label="Training (Moving Average)")
if val_loss.size: plt.plot(moving_average(val_loss, win), "--", lw=2.5, label="Validation (Moving Average)")
plt.title("Loss Trajectory"); plt.xlabel("Training epoch #"); plt.grid(axis="y", alpha:.25); plt.legend(); plt.tight_layout()
How to run
# 1) Build dataset shards → merge
python generate_shard.py
python merge_shards.py
# 2) Train BayesFlow model
python train_gw_bayesflow_down4_pf2_float32.py
# 3) Diagnostics & plots
python gw_bayesflow_diagnostics_down4_pf2.py
python plot_loss_trajectory.py
Skills shown: Simulation-Based Inference, normalizing flows (CouplingFlow), TensorFlow/Keras, PSD whitening & anti-alias decimation, uncertainty calibration (rank/ECDF, z-normality), reproducible ML ops.