#!/usr/bin/env python3 """Tests for density evolution optimizer.""" import numpy as np import pytest import sys import os sys.path.insert(0, os.path.dirname(__file__)) class TestDensityEvolution: """Tests for the Monte Carlo DE engine.""" def test_de_known_good_converges(self): """DE with original staircase profile at lam_s=10 should converge easily.""" from density_evolution import run_de, ORIGINAL_STAIRCASE_PROFILE np.random.seed(42) converged, error_frac = run_de( ORIGINAL_STAIRCASE_PROFILE, lam_s=10.0, lam_b=0.1, z_pop=10000, max_iter=50 ) assert converged, f"DE should converge at lam_s=10, error_frac={error_frac}" def test_de_known_bad_fails(self): """DE at very low lam_s=0.1 should not converge.""" from density_evolution import run_de, ORIGINAL_STAIRCASE_PROFILE np.random.seed(42) converged, error_frac = run_de( ORIGINAL_STAIRCASE_PROFILE, lam_s=0.1, lam_b=0.1, z_pop=10000, max_iter=50 ) assert not converged, f"DE should NOT converge at lam_s=0.1, error_frac={error_frac}" def test_de_population_shape(self): """Verify belief arrays have correct shapes after one step.""" from density_evolution import de_channel_init, density_evolution_step np.random.seed(42) n_base = 8 m_base = 7 z_pop = 1000 # Original staircase H_base profile from density_evolution import ORIGINAL_STAIRCASE_PROFILE beliefs, msg_memory = de_channel_init(ORIGINAL_STAIRCASE_PROFILE, z_pop, lam_s=5.0, lam_b=0.1) # beliefs should be (n_base, z_pop) assert beliefs.shape == (n_base, z_pop), f"Expected ({n_base}, {z_pop}), got {beliefs.shape}" # Run one step beliefs = density_evolution_step(beliefs, msg_memory, ORIGINAL_STAIRCASE_PROFILE, z_pop) assert beliefs.shape == (n_base, z_pop), f"Shape changed after step: {beliefs.shape}" class TestThresholdComputation: """Tests for threshold binary search.""" def test_threshold_original_staircase(self): """Threshold for original staircase [7,2,2,2,2,2,2,1] should be ~3-6 photons.""" from density_evolution import compute_threshold_for_profile np.random.seed(42) threshold = compute_threshold_for_profile( [7, 2, 2, 2, 2, 2, 2, 1], m_base=7, lam_b=0.1, z_pop=10000, tol=0.5 ) assert 2.0 < threshold < 8.0, f"Expected threshold ~3-6, got {threshold}" def test_threshold_peg_ring(self): """PEG ring [7,3,3,3,2,2,2,2] should have lower or equal threshold than original.""" from density_evolution import compute_threshold_for_profile np.random.seed(42) thresh_orig = compute_threshold_for_profile( [7, 2, 2, 2, 2, 2, 2, 1], m_base=7, lam_b=0.1, z_pop=15000, tol=0.25 ) np.random.seed(123) thresh_peg = compute_threshold_for_profile( [7, 3, 3, 3, 2, 2, 2, 2], m_base=7, lam_b=0.1, z_pop=15000, tol=0.25 ) assert thresh_peg <= thresh_orig, ( f"PEG threshold {thresh_peg} should be <= original {thresh_orig}" ) def test_profile_to_hbase(self): """build_de_profile should produce valid profile with correct column degrees.""" from density_evolution import build_de_profile profile = build_de_profile([7, 3, 2, 2, 2, 2, 2, 2], m_base=7) assert profile['n_base'] == 8 assert profile['m_base'] == 7 assert profile['vn_degrees'] == [7, 3, 2, 2, 2, 2, 2, 2] # Every row should have at least 2 connections for r, conns in enumerate(profile['connections']): assert len(conns) >= 2, f"Row {r} has only {len(conns)} connections" class TestDegreeDistributionOptimizer: """Tests for the exhaustive search optimizer.""" def test_enumerate_candidates(self): """Enumeration should produce 3^7 = 2187 candidates.""" from density_evolution import enumerate_vn_candidates candidates = enumerate_vn_candidates(m_base=7) assert len(candidates) == 3**7, f"Expected 2187, got {len(candidates)}" # Each candidate should have 8 elements (info col + 7 parity) for c in candidates: assert len(c) == 8 assert c[0] == 7 # info column always degree 7 def test_filter_removes_invalid(self): """Filter should keep valid distributions and remove truly invalid ones.""" from density_evolution import filter_by_row_degree # All-dv=2 parity: parity_edges=14, dc_avg=3 -> valid for [3,6] all_2 = [7, 2, 2, 2, 2, 2, 2, 2] assert filter_by_row_degree([all_2], m_base=7, dc_min=3, dc_max=6) == [all_2] # All-dv=4 parity: parity_edges=28, dc_avg=5 -> valid for [3,6] all_4 = [7, 4, 4, 4, 4, 4, 4, 4] assert filter_by_row_degree([all_4], m_base=7, dc_min=3, dc_max=6) == [all_4] # A hypothetical all-dv=1 parity: parity_edges=7, total=14, avg dc=2 < 3 -> invalid all_1 = [7, 1, 1, 1, 1, 1, 1, 1] assert filter_by_row_degree([all_1], m_base=7, dc_min=3, dc_max=6) == [] # With tighter constraints (dc_min=4), all-dv=2 should be removed assert filter_by_row_degree([all_2], m_base=7, dc_min=4, dc_max=6) == [] def test_optimizer_finds_better_than_original(self): """Optimizer should find a distribution with threshold <= original staircase.""" from density_evolution import optimize_degree_distribution, compute_threshold_for_profile np.random.seed(42) results = optimize_degree_distribution(m_base=7, lam_b=0.1, top_k=5, z_pop_coarse=5000, z_pop_fine=10000, tol=0.5) assert len(results) > 0, "Optimizer should return at least one result" best_degrees, best_threshold = results[0] # Original staircase threshold is ~3-5 photons assert best_threshold < 6.0, f"Best threshold {best_threshold} should be < 6.0" class TestPEGBaseMatrixConstructor: """Tests for the PEG base matrix constructor.""" def test_construct_matches_target_degrees(self): """Constructed matrix should have the target column degrees.""" from density_evolution import construct_base_matrix np.random.seed(42) target = [7, 3, 3, 3, 2, 2, 2, 2] H_base, girth = construct_base_matrix(target, z=32, n_trials=500) # Check column degrees for c in range(H_base.shape[1]): actual_deg = np.sum(H_base[:, c] >= 0) assert actual_deg == target[c], ( f"Col {c}: expected degree {target[c]}, got {actual_deg}" ) def test_construct_has_valid_rank(self): """Full H matrix should have full rank, parity submatrix should too.""" from density_evolution import construct_base_matrix, verify_matrix np.random.seed(42) target = [7, 3, 3, 3, 2, 2, 2, 2] H_base, girth = construct_base_matrix(target, z=32, n_trials=500) checks = verify_matrix(H_base, z=32) assert checks['full_rank'], f"Full matrix rank {checks['actual_rank']} < expected {checks['expected_rank']}" assert checks['parity_rank'], f"Parity submatrix not full rank" def test_construct_encodable(self): """Encoding a random info word should produce zero syndrome.""" from density_evolution import construct_base_matrix, verify_matrix np.random.seed(42) target = [7, 3, 3, 3, 2, 2, 2, 2] H_base, girth = construct_base_matrix(target, z=32, n_trials=500) checks = verify_matrix(H_base, z=32) assert checks['encodable'], "Should be able to encode and verify syndrome=0" def test_construct_girth_at_least_4(self): """Constructed matrix should have girth >= 4.""" from density_evolution import construct_base_matrix np.random.seed(42) target = [7, 3, 3, 3, 2, 2, 2, 2] H_base, girth = construct_base_matrix(target, z=32, n_trials=500) assert girth >= 4, f"Girth {girth} should be >= 4" class TestFERValidationAndCLI: """Tests for FER validation and CLI.""" def test_validate_returns_results(self): """validate_matrix should return FER results dict.""" from density_evolution import validate_matrix from ldpc_sim import H_BASE np.random.seed(42) results = validate_matrix(H_BASE, lam_s_points=[10.0], n_frames=10, lam_b=0.1) assert 10.0 in results, f"Expected key 10.0, got {list(results.keys())}" assert 'fer' in results[10.0] assert 0.0 <= results[10.0]['fer'] <= 1.0 def test_cli_threshold(self): """CLI threshold subcommand should exit 0.""" import subprocess result = subprocess.run( ['python3', 'model/density_evolution.py', 'threshold', '--z-pop', '5000', '--tol', '1.0'], capture_output=True, text=True, timeout=120, ) assert result.returncode == 0, f"CLI failed: {result.stderr}" assert 'threshold' in result.stdout.lower() or 'photon' in result.stdout.lower() class TestNormalizedMinSum: """Tests for normalized min-sum CN update mode.""" def test_normalized_minsun_output_smaller(self): """Normalized min-sum should scale magnitude by alpha, not subtract offset.""" from ldpc_sim import min_sum_cn_update # Input: [10, -15, 20] -> min1=10 (idx0), min2=15 # Offset mode: output magnitudes = max(0, mag - 1) # idx0 gets min2=15, so mag=14; idx1,2 get min1=10, so mag=9 # Normalized mode (alpha=0.75): output magnitudes = floor(mag * 0.75) # idx0 gets min2=15, so mag=floor(15*0.75)=11; idx1,2 get min1=10, so mag=floor(10*0.75)=7 result_norm = min_sum_cn_update([10, -15, 20], cn_mode='normalized', alpha=0.75) # idx0: min2=15, floor(15*0.75)=11, sign=XOR(1,0)^0=1^0=1 -> negative? No. # signs = [0, 1, 0], sign_xor = 1 # idx0: ext_sign = 1^0 = 1 -> -11 # idx1: ext_sign = 1^1 = 0, mag=floor(10*0.75)=7 -> 7 # idx2: ext_sign = 1^0 = 1, mag=floor(10*0.75)=7 -> -7 assert result_norm[0] == -11, f"Expected -11, got {result_norm[0]}" assert result_norm[1] == 7, f"Expected 7, got {result_norm[1]}" assert result_norm[2] == -7, f"Expected -7, got {result_norm[2]}" def test_normalized_decode_converges(self): """Decode a known codeword at lam_s=5 with normalized min-sum.""" from ldpc_analysis import generic_decode, peg_encode, build_peg_matrix from ldpc_sim import poisson_channel, quantize_llr np.random.seed(42) H_base, H_full = build_peg_matrix(z=32) k = 32 info = np.zeros(k, dtype=np.int8) codeword = peg_encode(info, H_base, H_full, z=32) llr_float, _ = poisson_channel(codeword, lam_s=5.0, lam_b=0.1) llr_q = quantize_llr(llr_float) decoded, converged, iters, sw = generic_decode( llr_q, H_base, z=32, max_iter=30, cn_mode='normalized', alpha=0.75 ) assert converged, f"Normalized min-sum should converge at lam_s=5, sw={sw}" assert np.all(decoded == info), f"Decoded bits don't match" def test_offset_mode_unchanged(self): """Default offset mode should produce identical results to before.""" from ldpc_sim import min_sum_cn_update # Test with explicit cn_mode='offset' and without (default) msgs = [10, -15, 20, -5] result_default = min_sum_cn_update(msgs) result_explicit = min_sum_cn_update(msgs, cn_mode='offset') assert result_default == result_explicit, ( f"Default and explicit offset should match: {result_default} vs {result_explicit}" ) class TestNormalizedMinSumDE: """Tests for normalized min-sum in the density evolution engine.""" def test_de_normalized_converges(self): """DE at lam_s=10 with normalized mode should converge.""" from density_evolution import run_de, ORIGINAL_STAIRCASE_PROFILE np.random.seed(42) converged, error_frac = run_de( ORIGINAL_STAIRCASE_PROFILE, lam_s=10.0, lam_b=0.1, z_pop=10000, max_iter=50, cn_mode='normalized', alpha=0.75 ) assert converged, f"DE normalized should converge at lam_s=10, error_frac={error_frac}" def test_de_normalized_threshold_different(self): """Normalized and offset modes should produce different thresholds.""" from density_evolution import compute_threshold, ORIGINAL_STAIRCASE_PROFILE np.random.seed(42) thresh_offset = compute_threshold( ORIGINAL_STAIRCASE_PROFILE, lam_b=0.1, z_pop=10000, tol=0.5, cn_mode='offset' ) np.random.seed(42) thresh_norm = compute_threshold( ORIGINAL_STAIRCASE_PROFILE, lam_b=0.1, z_pop=10000, tol=0.5, cn_mode='normalized', alpha=0.75 ) assert thresh_norm != thresh_offset, ( f"Thresholds should differ: offset={thresh_offset}, normalized={thresh_norm}" ) class TestZ128Support: """Tests for Z=128 matrix construction and optimization.""" def test_construct_z128_valid(self): """Construct matrix for [7,4,4,4,4,3,3,3] with z=128. Verify full rank, girth >= 6, encodable.""" from density_evolution import construct_base_matrix, verify_matrix np.random.seed(42) target = [7, 4, 4, 4, 4, 3, 3, 3] H_base, girth = construct_base_matrix(target, z=128, n_trials=500) checks = verify_matrix(H_base, z=128) assert checks['full_rank'], f"Full rank failed: {checks['actual_rank']}/{checks['expected_rank']}" assert girth >= 6, f"Girth {girth} should be >= 6" assert checks['encodable'], "Should be encodable" def test_validate_z128_runs(self): """Run validate_matrix with z=128, n_frames=5 at lam_s=5. Verify returns dict with FER.""" from density_evolution import construct_base_matrix, validate_matrix np.random.seed(42) target = [7, 4, 4, 4, 4, 3, 3, 3] H_base, girth = construct_base_matrix(target, z=128, n_trials=200) results = validate_matrix(H_base, lam_s_points=[5.0], n_frames=5, lam_b=0.1, z=128) assert 5.0 in results, f"Expected key 5.0, got {list(results.keys())}" assert 'fer' in results[5.0] class TestAlphaOptimization: """Tests for alpha sweep optimization.""" def test_optimize_alpha_returns_best(self): """Alpha sweep for optimized degrees should find threshold < 3.5.""" from density_evolution import optimize_alpha np.random.seed(42) best_alpha, best_threshold = optimize_alpha( [7, 4, 4, 4, 4, 3, 3, 3], m_base=7, lam_b=0.1, z_pop=10000, tol=0.5 ) assert best_alpha is not None, "Should find a best alpha" assert 0.5 <= best_alpha <= 1.0, f"Alpha {best_alpha} out of range" # Normalized should be competitive with offset threshold (3.05) assert best_threshold < 4.0, f"Best threshold {best_threshold} too high" def test_alpha_sweep_range(self): """All alpha values in range should produce valid thresholds.""" from density_evolution import optimize_alpha, build_de_profile, compute_threshold np.random.seed(42) profile = build_de_profile([7, 4, 4, 4, 4, 3, 3, 3], m_base=7) alpha_range = [0.5, 0.75, 1.0] for alpha in alpha_range: t = compute_threshold(profile, lam_b=0.1, z_pop=5000, tol=1.0, cn_mode='normalized', alpha=alpha) assert 0.1 < t < 20.0, f"alpha={alpha}: threshold {t} out of valid range"