feat: add FER validation and CLI for density evolution

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
cah
2026-02-24 06:03:19 -07:00
parent f30f972dab
commit ca651f4f30
2 changed files with 258 additions and 3 deletions

View File

@@ -702,9 +702,240 @@ def verify_matrix(H_base, z=32):
# =============================================================================
# CLI placeholder (will be extended in later tasks)
# FER Validation
# =============================================================================
def validate_matrix(H_base, lam_s_points, n_frames=200, lam_b=0.1, max_iter=30):
"""
Run full Monte Carlo FER simulation for a base matrix.
Returns dict mapping lam_s -> {fer, ber, avg_iter}.
"""
from ldpc_analysis import generic_decode, peg_encode
z = Z
k = z
H_full = _build_full_h_from_base(H_base, z)
results = {}
for lam_s in lam_s_points:
bit_errors = 0
frame_errors = 0
total_bits = 0
total_iter = 0
for _ in range(n_frames):
info = np.random.randint(0, 2, k).astype(np.int8)
try:
codeword = peg_encode(info, H_base, H_full, z=z)
except Exception:
frame_errors += 1
total_bits += k
continue
from ldpc_sim import poisson_channel, quantize_llr
llr_float, _ = poisson_channel(codeword, lam_s, lam_b)
llr_q = quantize_llr(llr_float)
decoded, converged, iters, sw = generic_decode(
llr_q, H_base, z=z, max_iter=max_iter, q_bits=Q_BITS
)
total_iter += iters
errs = np.sum(decoded != info)
bit_errors += errs
total_bits += k
if errs > 0:
frame_errors += 1
ber = bit_errors / total_bits if total_bits > 0 else 0
fer = frame_errors / n_frames
avg_iter = total_iter / n_frames
results[lam_s] = {
'fer': float(fer),
'ber': float(ber),
'avg_iter': float(avg_iter),
}
return results
def run_full_pipeline(lam_b=0.1, seed=42):
"""
Full pipeline: optimize -> construct -> validate.
"""
np.random.seed(seed)
print("=" * 70)
print("DENSITY EVOLUTION FULL PIPELINE")
print("=" * 70)
# Step 1: Compute threshold for reference matrices
print("\n--- Step 1: Reference thresholds ---")
from ldpc_analysis import build_peg_matrix
ref_matrices = {
'Original staircase [7,2,2,2,2,2,2,1]': H_BASE,
'PEG ring [7,3,3,3,2,2,2,2]': build_peg_matrix(z=32)[0],
}
ref_thresholds = {}
for name, H_b in ref_matrices.items():
profile = make_profile(H_b)
threshold = compute_threshold(profile, lam_b=lam_b, z_pop=20000, tol=0.25)
ref_thresholds[name] = threshold
print(f" {name}: threshold = {threshold:.2f} photons/slot")
# Step 2: Run optimizer
print("\n--- Step 2: Degree distribution optimization ---")
opt_results = optimize_degree_distribution(
m_base=7, lam_b=lam_b, top_k=5,
z_pop_coarse=10000, z_pop_fine=20000, tol=0.25
)
if not opt_results:
print("ERROR: No valid distributions found")
return None
best_degrees, best_threshold = opt_results[0]
print(f"\nBest distribution: {best_degrees}")
print(f"Best threshold: {best_threshold:.2f} photons/slot")
# Step 3: Construct matrix
print("\n--- Step 3: Matrix construction ---")
H_opt, girth = construct_base_matrix(best_degrees, z=32, n_trials=2000)
checks = verify_matrix(H_opt, z=32)
print(f" Constructed matrix: girth={girth}, rank={checks['actual_rank']}/{checks['expected_rank']}")
print(f" Encodable: {checks['encodable']}, Full rank: {checks['full_rank']}")
print(f" Base matrix:\n{H_opt}")
# Step 4: FER validation
print("\n--- Step 4: FER validation ---")
lam_s_points = [2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
print(f"\n{'lam_s':>8s} {'Optimized':>12s} {'Original':>12s} {'PEG ring':>12s}")
print("-" * 50)
fer_opt = validate_matrix(H_opt, lam_s_points, n_frames=200, lam_b=lam_b)
fer_orig = validate_matrix(H_BASE, lam_s_points, n_frames=200, lam_b=lam_b)
fer_peg = validate_matrix(ref_matrices['PEG ring [7,3,3,3,2,2,2,2]'],
lam_s_points, n_frames=200, lam_b=lam_b)
for lam_s in lam_s_points:
f_opt = fer_opt[lam_s]['fer']
f_orig = fer_orig[lam_s]['fer']
f_peg = fer_peg[lam_s]['fer']
print(f"{lam_s:8.1f} {f_opt:12.3f} {f_orig:12.3f} {f_peg:12.3f}")
# Save results
output = {
'reference_thresholds': ref_thresholds,
'optimizer_results': [(d, t) for d, t in opt_results],
'best_degrees': best_degrees,
'best_threshold': float(best_threshold),
'constructed_matrix': H_opt.tolist(),
'matrix_checks': {k: (v if not isinstance(v, (np.integer, np.floating, np.bool_)) else v.item())
for k, v in checks.items()},
'fer_comparison': {
'lam_s_points': lam_s_points,
'optimized': {str(k): v for k, v in fer_opt.items()},
'original': {str(k): v for k, v in fer_orig.items()},
'peg_ring': {str(k): v for k, v in fer_peg.items()},
},
}
out_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data')
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, 'de_results.json')
with open(out_path, 'w') as f:
json.dump(output, f, indent=2, default=str)
print(f"\nResults saved to {out_path}")
return output
# =============================================================================
# CLI
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description='Density Evolution Optimizer for QC-LDPC Codes',
)
subparsers = parser.add_subparsers(dest='command')
# threshold
p_thresh = subparsers.add_parser('threshold', help='Compute threshold for known matrices')
p_thresh.add_argument('--z-pop', type=int, default=20000)
p_thresh.add_argument('--tol', type=float, default=0.25)
p_thresh.add_argument('--seed', type=int, default=42)
# optimize
p_opt = subparsers.add_parser('optimize', help='Run optimizer, print top-10')
p_opt.add_argument('--top-k', type=int, default=10)
p_opt.add_argument('--seed', type=int, default=42)
# construct
p_con = subparsers.add_parser('construct', help='Build matrix for given degrees')
p_con.add_argument('degrees', type=str, help='Comma-separated degrees, e.g. 7,3,3,3,2,2,2,2')
p_con.add_argument('--n-trials', type=int, default=2000)
p_con.add_argument('--seed', type=int, default=42)
# validate
p_val = subparsers.add_parser('validate', help='FER comparison')
p_val.add_argument('--n-frames', type=int, default=200)
p_val.add_argument('--seed', type=int, default=42)
# full
p_full = subparsers.add_parser('full', help='Full pipeline')
p_full.add_argument('--seed', type=int, default=42)
args = parser.parse_args()
if args.command == 'threshold':
np.random.seed(args.seed)
from ldpc_analysis import build_peg_matrix
matrices = {
'Original staircase [7,2,2,2,2,2,2,1]': H_BASE,
'PEG ring [7,3,3,3,2,2,2,2]': build_peg_matrix(z=32)[0],
}
print("DE Threshold Computation")
print("-" * 50)
for name, H_b in matrices.items():
profile = make_profile(H_b)
threshold = compute_threshold(profile, lam_b=0.1, z_pop=args.z_pop, tol=args.tol)
print(f" {name}: {threshold:.2f} photons/slot")
elif args.command == 'optimize':
np.random.seed(args.seed)
optimize_degree_distribution(m_base=7, lam_b=0.1, top_k=args.top_k)
elif args.command == 'construct':
np.random.seed(args.seed)
degrees = [int(x) for x in args.degrees.split(',')]
H_base, girth = construct_base_matrix(degrees, z=32, n_trials=args.n_trials)
checks = verify_matrix(H_base, z=32)
print(f"Constructed matrix for degrees {degrees}:")
print(f" Girth: {girth}")
print(f" Rank: {checks['actual_rank']}/{checks['expected_rank']}")
print(f" Encodable: {checks['encodable']}")
print(f"Base matrix:")
print(H_base)
elif args.command == 'validate':
np.random.seed(args.seed)
lam_s_points = [2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
print("FER Validation: Original staircase")
results = validate_matrix(H_BASE, lam_s_points, n_frames=args.n_frames)
for lam_s in lam_s_points:
r = results[lam_s]
print(f" lam_s={lam_s:.1f}: FER={r['fer']:.3f}, BER={r['ber']:.6f}")
elif args.command == 'full':
run_full_pipeline(seed=args.seed)
else:
parser.print_help()
if __name__ == '__main__':
print("Density Evolution Optimizer")
print("Run with subcommands: threshold, optimize, construct, validate, full")
main()

View File

@@ -175,3 +175,27 @@ class TestPEGBaseMatrixConstructor:
target = [7, 3, 3, 3, 2, 2, 2, 2]
H_base, girth = construct_base_matrix(target, z=32, n_trials=500)
assert girth >= 4, f"Girth {girth} should be >= 4"
class TestFERValidationAndCLI:
"""Tests for FER validation and CLI."""
def test_validate_returns_results(self):
"""validate_matrix should return FER results dict."""
from density_evolution import validate_matrix
from ldpc_sim import H_BASE
np.random.seed(42)
results = validate_matrix(H_BASE, lam_s_points=[10.0], n_frames=10, lam_b=0.1)
assert 10.0 in results, f"Expected key 10.0, got {list(results.keys())}"
assert 'fer' in results[10.0]
assert 0.0 <= results[10.0]['fer'] <= 1.0
def test_cli_threshold(self):
"""CLI threshold subcommand should exit 0."""
import subprocess
result = subprocess.run(
['python3', 'model/density_evolution.py', 'threshold', '--z-pop', '5000', '--tol', '1.0'],
capture_output=True, text=True, timeout=120,
)
assert result.returncode == 0, f"CLI failed: {result.stderr}"
assert 'threshold' in result.stdout.lower() or 'photon' in result.stdout.lower()