Parameter Estimation of Black-Hole Binary Waveforms — BayesFlow (SBI)

End-to-end simulation-based inference for gravitational-wave signals. I simulate BBH waveforms with PyCBC, add PSD-colored noise, whiten/decimate, and train a BayesFlow model (Conv1D+GRU summary → CouplingFlow posterior) to infer [m1, m2, χ1, χ2, D, ι] with calibrated uncertainty.

Because public, labeled GW data are limited, I generate realistic waveforms from first-principles simulators under reasonable astrophysical priors and detector noise models. Classic ML regressors only provide point estimates; BayesFlow SBI learns full posteriors—capturing uncertainty, multi-modality, and parameter correlations efficiently.

Python Deep learning TensorFlow/Keras NumPy Google Colab
Artistic visualization of a black-hole binary

Code Map (project structure)

.
├── priors.py                      # Physics-informed parameter priors (m1, m2, spins, D, ι)
├── simulator.py                   # PyCBC waveform + PSD-colored noise + whitening
├── generate_shard.py              # Decimate 4096→1024 Hz + write NPZ shards
├── merge_shards.py                # Merge shards → split → scale θ → save datasets + scaler
├── train_gw_bayesflow_down4_pf2_float32.py
│   └─ Conv1D+GRU summary → CouplingFlow posterior + AdamW + warmup→cosine
├── gw_bayesflow_diagnostics_down4_pf2.py
│   └─ Posterior sampling + rank/ECDF calibration + z-score QQ/hist
├── plot_loss_trajectory.py        # CSV/NPY → smooth loss curves (moving averages)
└── SBI_gw.ipynb                   # Colab: data → training → diagnostics end-to-end

Data Pipeline (simulate → preprocess → dataset)

I generate paired examples (x, θ) end-to-end: sample physics-informed priors for θ = [m1, m2, χ1, χ2, D, ι], simulate BBH strain with PyCBC, inject PSD-colored noise, then whiten (freq-domain), decimate 4096→1024 Hz, and serialize to NPZ with fixed splits for reproducible experiments.

  • Priors → PyCBC waveform → PSD-colored noise
  • Whitening → anti-alias decimation → standardization
  • NPZ dataset + metadata; train/val split; fixed RNG seeds

A) Prior sampler

Physics-informed priors (power-law masses, spins uniform, D uniform in volume, isotropic ι).

import numpy as np, pandas as pd

def sample_prior(n_samples, *, alpha=0.0,
                 m_min=5.0, m_max=80.0,
                 D_min=100.0, D_max=2000.0,
                 rng_seed=None):
    rng = np.random.default_rng(rng_seed)

    # Primary mass m1: uniform or power-law p(m1) ∝ m1^{-α}
    if np.isclose(alpha, 0.0):
        m1 = rng.uniform(m_min, m_max, n_samples)
    else:
        expo = 1.0 - alpha
        u = rng.random(n_samples)
        m1 = (u*(m_max**expo - m_min**expo) + m_min**expo) ** (1.0/expo)

    # Secondary mass m2 ≤ m1
    m2 = rng.uniform(m_min, m1)

    # Spins, distance (uniform in volume), isotropic inclination
    chi1 = rng.uniform(0.0, 0.99, n_samples)
    chi2 = rng.uniform(0.0, 0.99, n_samples)
    uV   = rng.random(n_samples)
    D    = (D_min**3 + uV*(D_max**3 - D_min**3)) ** (1.0/3.0)   # ∝ D^2
    inc  = np.arccos(rng.uniform(-1.0, 1.0, n_samples))

    return pd.DataFrame({
        "m1": m1.astype(np.float32), "m2": m2.astype(np.float32),
        "chi1": chi1.astype(np.float32), "chi2": chi2.astype(np.float32),
        "D": D.astype(np.float32), "inc": inc.astype(np.float32)
    })

B) Simulator & whitening

IMRPhenomD waveform + aLIGO PSD noise; min-SNR control; circular shift; PSD whitening.

import numpy as np, pandas as pd
from pycbc.waveform import get_td_waveform
from pycbc.noise    import noise_from_psd
from pycbc.psd      import aLIGOZeroDetHighPower
from pycbc.types    import TimeSeries, FrequencySeries

DELTA_T  = np.float32(1/4096)
DURATION = np.float32(8.0)
F_LOWER  = 40.0
N        = int(DURATION/DELTA_T)

def _make_psd(nfft):
    psd = aLIGOZeroDetHighPower(nfft, 1.0/float(DELTA_T), 20.0)
    return psd.astype(np.float64)

def _whiten(x: TimeSeries, sigma=1.0) -> TimeSeries:
    Xf = x.to_frequencyseries()
    psd = _make_psd(len(Xf))
    Wf  = FrequencySeries(Xf.numpy() / np.sqrt(psd.numpy()/2.0), delta_f=Xf.delta_f)
    y   = Wf.to_timeseries()[:len(x)]
    y   = y / (float(y.std())/float(sigma))
    y._delta_t = x.delta_t
    return y.astype(np.float32)

_CAL = 1.0  # one-time calibration so whitened noise has σ≈1

def simulate_event(theta: pd.Series, seed=None, *, min_snr=8.0, snr_jitter=None) -> TimeSeries:
    rng = np.random.default_rng(seed)

    # 1) Clean signal (preserve distance amplitude)
    hp, _ = get_td_waveform(
        approximant="IMRPhenomD",
        mass1=float(theta.m1), mass2=float(theta.m2),
        spin1z=float(theta.chi1), spin2z=float(theta.chi2),
        distance=float(theta.D), inclination=float(theta.inc),
        delta_t=float(DELTA_T), f_lower=F_LOWER
    )
    h = hp.astype(np.float32)
    h = (h[:N] if len(h) >= N else TimeSeries(np.pad(h.numpy(), (0, N-len(h))), delta_t=DELTA_T))

    # 2) PSD-colored noise; enforce min SNR by scaling NOISE (not the signal)
    noise = noise_from_psd(N, DELTA_T, _make_psd(N//2+1), seed=seed).astype(np.float32)
    def _opt_snr(sig):
        return float(np.sqrt((sig.numpy()**2).sum() / (noise.numpy()**2).sum()) * 10.0)

    clean_snr = _opt_snr(h)
    noise_scale = 1.0
    if (min_snr is not None) and (clean_snr > 0) and (clean_snr < min_snr):
        noise_scale = clean_snr / float(min_snr)
    if snr_jitter is not None and clean_snr > 0:
        lo, hi = snr_jitter
        target = float(rng.uniform(lo, hi))
        noise_scale = min(noise_scale, clean_snr / target)
    noise *= np.float32(noise_scale)

    # 3) Mix, random circular shift, whiten
    x = h + noise
    shift = int(rng.integers(0, N))
    x = TimeSeries(np.roll(x.numpy(), shift).astype(np.float32), delta_t=DELTA_T)
    return _whiten(x, _CAL)

C) Anti-alias decimation & sharding

Two-stage ×2 decimation 4096→1024 Hz via resample_poly; shard NPZ writer.

import numpy as np
from scipy.signal import resample_poly, windows

KAISER_BETA = 12.0
PAD_SAMPLES = 1024
TAPER_ALPHA = 0.02
FS_IN, FS_OUT, DURATION = 4096, 1024, 8.0
T = int(FS_OUT * DURATION)  # 8192

def _taper_ends(x, alpha=TAPER_ALPHA):
    if alpha <= 0.0: return x
    return (x * windows.tukey(len(x), alpha=alpha, sym=True)).astype(np.float64, copy=False)

def _pad_edges(x, pad=PAD_SAMPLES):
    return np.pad(x, (pad, pad), mode="constant") if pad > 0 else x

def _trim_edges(y, down, pad=PAD_SAMPLES):
    s = (pad // down) if pad > 0 else 0
    return y[s:-s] if s > 0 else y

def _decimate_x2(x):
    y = _taper_ends(_pad_edges(x))
    y = resample_poly(y, up=1, down=2, window=("kaiser", KAISER_BETA))
    y = _trim_edges(y, down=2)
    return y

def decimate_4096_to_1024(x4096: np.ndarray) -> np.ndarray:
    y = _decimate_x2(x4096)  # 4096 → 2048
    y = _decimate_x2(y)      # 2048 → 1024
    return y.astype(np.float32)

# simulate_event(θ) → x4096
# y = decimate_4096_to_1024(x4096) → shape (8192,)
# save shards: parameters (N,6), waveforms (N,1,8192)

D) Merge → split → scale → save

Concatenate shards, fixed split, StandardScaler(θ), save datasets + fitted scaler.

from pathlib import Path
import numpy as np, pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

RAW_DIR, OUT_DIR = Path("datasets/raw"), Path("datasets")
PREFIX = "gw_bbh_down4"

def load_all():
    xs, thetas = [], []
    for p in sorted(RAW_DIR.glob(f"{PREFIX}_shard*.npz")):
        data = np.load(p)
        xs.append(data["waveforms"])      # (N, 1, 8192) float32
        thetas.append(data["parameters"]) # (N, 6)       float32
    return np.concatenate(xs, 0), np.concatenate(thetas, 0)

def main():
    X, θ = load_all()
    X = X.astype(np.float32, copy=False)
    θ = θ.astype(np.float32, copy=False)

    X_train, X_val, θ_train, θ_val = train_test_split(X, θ, test_size=4000, random_state=42, shuffle=True)

    scaler = StandardScaler().fit(θ_train)
    θ_train = scaler.transform(θ_train).astype(np.float32)
    θ_val   = scaler.transform(θ_val).astype(np.float32)

    OUT_DIR.mkdir(parents=True, exist_ok=True)
    np.savez_compressed(OUT_DIR / f"{PREFIX}_train.npz", parameters=θ_train, waveforms=X_train)
    np.savez_compressed(OUT_DIR / f"{PREFIX}_val.npz",   parameters=θ_val,   waveforms=X_val)
    with open(OUT_DIR / f"{PREFIX}_param_scaler.pkl", "wb") as f:
        pickle.dump(scaler, f)

if __name__ == "__main__":
    main()

Training & Validation

Conv1D+GRU summary network encodes x; a CouplingFlow models p(θ|x). NLL + AdamW, warm-up → cosine LR, early stopping, gradient clipping. Validate with rank/ECDF calibration, coverage, and NRMSE.

1) Mean-pool ×2 + standardize (save scaler)

Downsample by mean-pool ×2; standardize with TRAIN stats; persist scaler artifact.

import json, numpy as np
POOL_FACTOR = 2

def mean_pool_1d(arr, k):
    N, T, C = arr.shape
    cut = (T // k) * k
    if cut != T: arr = arr[:, :cut, :]
    return arr.reshape(N, cut // k, k, C).mean(axis=2, dtype=np.float32)

x_train = mean_pool_1d(x_train, POOL_FACTOR)
x_val   = mean_pool_1d(x_val,   POOL_FACTOR)

def per_channel_standardise(x, mean=None, std=None):
    if mean is None or std is None:
        mean = x.mean(axis=(0,1), keepdims=True, dtype=np.float64)
        std  = x.std(axis=(0,1),  keepdims=True, dtype=np.float64) + 1e-8
    x = (x - mean) / std
    return x.astype(np.float32), mean.squeeze(), std.squeeze()

x_train, wmean, wstd = per_channel_standardise(x_train)
x_val,   _,    _     = per_channel_standardise(x_val, wmean[None,None,...], wstd[None,None,...])
np.savez(SCALER_NPZ, mean=wmean.astype(np.float32), std=wstd.astype(np.float32))

2) Model → Conv1D + GRU → CouplingFlow

TimeSeriesNetwork + CouplingFlow (depth 4, affine, permute, actnorm).

from bayesflow.networks import TimeSeriesNetwork, CouplingFlow

summary_net = TimeSeriesNetwork(
    summary_dim   = 64,
    filters       = (48, 64, 96, 128),
    kernel_sizes  = (5, 5, 3, 3),
    recurrent_dim = 128,
    bidirectional = False,
    dropout       = 0.35,
)

invertible_net = CouplingFlow(
    depth       = 4,
    transform   = "affine",
    permutation = "random",
    use_actnorm = True,
)

3) AdamW + warm-up → cosine (clipnorm)

AdamW, weight decay, clipnorm=1.0; WarmUpCosine schedule (~5% warm-up).

import tensorflow as tf
from tensorflow import keras

EPOCHS, BATCH_SIZE = 80, 64
steps_per_epoch = max(1, len(x_train) // BATCH_SIZE)
total_steps     = EPOCHS * steps_per_epoch
warmup_steps    = int(0.05 * total_steps)

class WarmUpCosine(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, base_lr, warmup_steps, total_steps, final_lr_ratio=5e-3):
        super().__init__()
        self.base_lr = tf.convert_to_tensor(float(base_lr), dtype=tf.float32)
        self.warmup_steps = tf.cast(int(warmup_steps), tf.float32)
        self.total_steps  = tf.cast(int(total_steps),  tf.float32)
        self.cosine_decay = tf.keras.optimizers.schedules.CosineDecay(
            initial_learning_rate=float(base_lr),
            decay_steps=int(total_steps) - int(warmup_steps),
            alpha=float(final_lr_ratio),
        )
    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        warm = self.base_lr * (step / tf.maximum(1.0, self.warmup_steps))
        cos  = self.cosine_decay(tf.cast(step - self.warmup_steps, tf.int64))
        return tf.where(step < self.warmup_steps, warm, cos)

lr_schedule = WarmUpCosine(base_lr=3e-4, warmup_steps=warmup_steps, total_steps=total_steps)
optimizer   = keras.optimizers.AdamW(learning_rate=lr_schedule, weight_decay=1e-4, clipnorm=1.0)

4) Workflow, callbacks, and fitting

BasicWorkflow + EarlyStopping, Checkpoint(best .keras), CSVLogger, TerminateOnNaN.

import bayesflow as bf, numpy as np
from bayesflow.adapters import Adapter

adapter = Adapter().rename("waveforms", "summary_variables").to_array()
workflow = bf.workflows.BasicWorkflow(
    adapter             = adapter,
    summary_network     = summary_net,
    invertible_network  = invertible_net,
    optimizer           = optimizer,
    inference_variables = ["inference_variables"],
    summary_variables   = ["summary_variables"],
)

early = keras.callbacks.EarlyStopping(monitor="val_loss", patience=8, restore_best_weights=True)
ckpt  = keras.callbacks.ModelCheckpoint(filepath=SAVE_PATH, monitor="val_loss", save_best_only=True, verbose=1)
csv   = keras.callbacks.CSVLogger(HIST_CSV, append=False)
nan   = keras.callbacks.TerminateOnNaN()

history = workflow.fit_offline(
    data            = {"waveforms": x_train, "inference_variables": theta_train},
    validation_data = {"waveforms": x_val,   "inference_variables": theta_val},
    epochs          = EPOCHS, batch_size = BATCH_SIZE,
    callbacks       = [early, ckpt, csv, nan], verbose = 1,
)
np.save(HIST_NPY, {k: np.array(v) for k, v in history.history.items()}, allow_pickle=True)

Results + How to Run & Skills

On held-out examples, posteriors contract around ground-truth as SNR rises. Calibration looks healthy: rank histograms ~flat, ECDF near the diagonal, and z-score QQ/histograms close to N(0,1). Loss curves are smooth.

Rank histogram
Calibration (rank histogram)
ECDF difference
ECDF calibration (95% bands)
z-score histograms
z-score histograms
z-score QQ
z-score QQ plots
Loss trajectory
Training/Validation loss (raw + moving averages)

A) Diagnostics pipeline

Restore model, apply preprocessing, sample posteriors, then render rank hist/ECDF and z-score QQ/hist plots.

# gw_bayesflow_diagnostics_down4_pf2.py — highlights
val       = np.load(VAL_PATH)
theta_val = val["parameters"].astype(np.float32)
x_val     = val["waveforms"].astype(np.float32)

# channels-last & mean-pool ×2
if x_val.ndim == 3 and x_val.shape[1] == 1: x_val = np.transpose(x_val, (0, 2, 1))
elif x_val.ndim == 2: x_val = x_val[:, :, None]
def mean_pool_1d(arr, k):
    N, T, C = arr.shape; cut = (T // k) * k
    if cut != T: arr = arr[:, :cut, :]
    return arr.reshape(N, cut // k, k, C).mean(axis=2, dtype=np.float32)
x_val = mean_pool_1d(x_val, k=2)

# standardize with TRAIN scaler
sc = np.load(SCALER_NPZ)
mean, std = sc["mean"][None,None,:], sc["std"][None,None,:]
x_val = ((x_val - mean) / std).astype(np.float32)

# restore & sample
adapter  = Adapter().rename("waveforms", "summary_variables").to_array()
workflow = BasicWorkflow(adapter=adapter, optimizer=None)
workflow.approximator = keras.saving.load_model(MODEL_PATH, compile=False)
idx     = np.random.default_rng(42).choice(len(x_val), 300, replace=False)
samples = workflow.sample(num_samples=1_000, conditions={"waveforms": x_val[idx]})["inference_variables"]

# diagnostics
plots.calibration_histogram(samples, theta_val[idx])
figs = workflow.plot_default_diagnostics(
  test_data={"waveforms": x_val[idx], "inference_variables": theta_val[idx]},
  calibration_ecdf_kwargs={"difference": True, "figsize": (15,3)}
)

B) Loss trajectory (raw points + moving averages)

Read CSVLogger / .npy history and draw a clean loss trajectory with centered moving averages.

def moving_average(x, window):
    x = np.asarray(x, dtype=float)
    if window < 2: return x
    w = int(window) | 1
    mask = ~np.isnan(x)
    num = np.convolve(np.where(mask, x, 0.0), np.ones(w), mode="same")
    den = np.convolve(mask.astype(float), np.ones(w), mode="same")
    return np.divide(num, np.maximum(den, 1e-12))

# load loss + val_loss; pick ~5% window
df = pd.read_csv(csv_path)  # fallback to npy if needed
loss, val_loss = df["loss"].to_numpy(), df.get("val_loss", pd.Series(np.nan)).to_numpy()
win = max(5, int(round(0.05 * len(loss))) | 1)

plt.figure(figsize=(16,4))
plt.plot(loss, "o", ms=3, lw=1, alpha=.35, label="Training")
if val_loss.size: plt.plot(val_loss, "o--", ms=3, lw=1, alpha=.25, label="Validation")
plt.plot(moving_average(loss, win), lw=2.5, label="Training (Moving Average)")
if val_loss.size: plt.plot(moving_average(val_loss, win), "--", lw=2.5, label="Validation (Moving Average)")
plt.title("Loss Trajectory"); plt.xlabel("Training epoch #"); plt.grid(axis="y", alpha:.25); plt.legend(); plt.tight_layout()

How to run

# 1) Build dataset shards → merge
python generate_shard.py
python merge_shards.py

# 2) Train BayesFlow model
python train_gw_bayesflow_down4_pf2_float32.py

# 3) Diagnostics & plots
python gw_bayesflow_diagnostics_down4_pf2.py
python plot_loss_trajectory.py

Skills shown: Simulation-Based Inference, normalizing flows (CouplingFlow), TensorFlow/Keras, PSD whitening & anti-alias decimation, uncertainty calibration (rank/ECDF, z-normality), reproducible ML ops.