Add cn_mode ('offset'/'normalized') and alpha parameters to
min_sum_cn_update() in ldpc_sim.py and generic_decode() in
ldpc_analysis.py. Normalized mode scales magnitudes by alpha
(default 0.75) instead of subtracting a fixed offset, which
is better suited for low-rate codes.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
252 lines
12 KiB
Python
252 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Tests for density evolution optimizer."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
import sys
|
|
import os
|
|
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
|
|
|
|
class TestDensityEvolution:
|
|
"""Tests for the Monte Carlo DE engine."""
|
|
|
|
def test_de_known_good_converges(self):
|
|
"""DE with original staircase profile at lam_s=10 should converge easily."""
|
|
from density_evolution import run_de, ORIGINAL_STAIRCASE_PROFILE
|
|
np.random.seed(42)
|
|
converged, error_frac = run_de(
|
|
ORIGINAL_STAIRCASE_PROFILE, lam_s=10.0, lam_b=0.1,
|
|
z_pop=10000, max_iter=50
|
|
)
|
|
assert converged, f"DE should converge at lam_s=10, error_frac={error_frac}"
|
|
|
|
def test_de_known_bad_fails(self):
|
|
"""DE at very low lam_s=0.1 should not converge."""
|
|
from density_evolution import run_de, ORIGINAL_STAIRCASE_PROFILE
|
|
np.random.seed(42)
|
|
converged, error_frac = run_de(
|
|
ORIGINAL_STAIRCASE_PROFILE, lam_s=0.1, lam_b=0.1,
|
|
z_pop=10000, max_iter=50
|
|
)
|
|
assert not converged, f"DE should NOT converge at lam_s=0.1, error_frac={error_frac}"
|
|
|
|
def test_de_population_shape(self):
|
|
"""Verify belief arrays have correct shapes after one step."""
|
|
from density_evolution import de_channel_init, density_evolution_step
|
|
np.random.seed(42)
|
|
n_base = 8
|
|
m_base = 7
|
|
z_pop = 1000
|
|
|
|
# Original staircase H_base profile
|
|
from density_evolution import ORIGINAL_STAIRCASE_PROFILE
|
|
beliefs, msg_memory = de_channel_init(ORIGINAL_STAIRCASE_PROFILE, z_pop, lam_s=5.0, lam_b=0.1)
|
|
|
|
# beliefs should be (n_base, z_pop)
|
|
assert beliefs.shape == (n_base, z_pop), f"Expected ({n_base}, {z_pop}), got {beliefs.shape}"
|
|
|
|
# Run one step
|
|
beliefs = density_evolution_step(beliefs, msg_memory, ORIGINAL_STAIRCASE_PROFILE, z_pop)
|
|
assert beliefs.shape == (n_base, z_pop), f"Shape changed after step: {beliefs.shape}"
|
|
|
|
|
|
class TestThresholdComputation:
|
|
"""Tests for threshold binary search."""
|
|
|
|
def test_threshold_original_staircase(self):
|
|
"""Threshold for original staircase [7,2,2,2,2,2,2,1] should be ~3-6 photons."""
|
|
from density_evolution import compute_threshold_for_profile
|
|
np.random.seed(42)
|
|
threshold = compute_threshold_for_profile(
|
|
[7, 2, 2, 2, 2, 2, 2, 1], m_base=7, lam_b=0.1,
|
|
z_pop=10000, tol=0.5
|
|
)
|
|
assert 2.0 < threshold < 8.0, f"Expected threshold ~3-6, got {threshold}"
|
|
|
|
def test_threshold_peg_ring(self):
|
|
"""PEG ring [7,3,3,3,2,2,2,2] should have lower or equal threshold than original."""
|
|
from density_evolution import compute_threshold_for_profile
|
|
np.random.seed(42)
|
|
thresh_orig = compute_threshold_for_profile(
|
|
[7, 2, 2, 2, 2, 2, 2, 1], m_base=7, lam_b=0.1,
|
|
z_pop=15000, tol=0.25
|
|
)
|
|
np.random.seed(123)
|
|
thresh_peg = compute_threshold_for_profile(
|
|
[7, 3, 3, 3, 2, 2, 2, 2], m_base=7, lam_b=0.1,
|
|
z_pop=15000, tol=0.25
|
|
)
|
|
assert thresh_peg <= thresh_orig, (
|
|
f"PEG threshold {thresh_peg} should be <= original {thresh_orig}"
|
|
)
|
|
|
|
def test_profile_to_hbase(self):
|
|
"""build_de_profile should produce valid profile with correct column degrees."""
|
|
from density_evolution import build_de_profile
|
|
profile = build_de_profile([7, 3, 2, 2, 2, 2, 2, 2], m_base=7)
|
|
assert profile['n_base'] == 8
|
|
assert profile['m_base'] == 7
|
|
assert profile['vn_degrees'] == [7, 3, 2, 2, 2, 2, 2, 2]
|
|
# Every row should have at least 2 connections
|
|
for r, conns in enumerate(profile['connections']):
|
|
assert len(conns) >= 2, f"Row {r} has only {len(conns)} connections"
|
|
|
|
|
|
class TestDegreeDistributionOptimizer:
|
|
"""Tests for the exhaustive search optimizer."""
|
|
|
|
def test_enumerate_candidates(self):
|
|
"""Enumeration should produce 3^7 = 2187 candidates."""
|
|
from density_evolution import enumerate_vn_candidates
|
|
candidates = enumerate_vn_candidates(m_base=7)
|
|
assert len(candidates) == 3**7, f"Expected 2187, got {len(candidates)}"
|
|
# Each candidate should have 8 elements (info col + 7 parity)
|
|
for c in candidates:
|
|
assert len(c) == 8
|
|
assert c[0] == 7 # info column always degree 7
|
|
|
|
def test_filter_removes_invalid(self):
|
|
"""Filter should keep valid distributions and remove truly invalid ones."""
|
|
from density_evolution import filter_by_row_degree
|
|
# All-dv=2 parity: parity_edges=14, dc_avg=3 -> valid for [3,6]
|
|
all_2 = [7, 2, 2, 2, 2, 2, 2, 2]
|
|
assert filter_by_row_degree([all_2], m_base=7, dc_min=3, dc_max=6) == [all_2]
|
|
# All-dv=4 parity: parity_edges=28, dc_avg=5 -> valid for [3,6]
|
|
all_4 = [7, 4, 4, 4, 4, 4, 4, 4]
|
|
assert filter_by_row_degree([all_4], m_base=7, dc_min=3, dc_max=6) == [all_4]
|
|
# A hypothetical all-dv=1 parity: parity_edges=7, total=14, avg dc=2 < 3 -> invalid
|
|
all_1 = [7, 1, 1, 1, 1, 1, 1, 1]
|
|
assert filter_by_row_degree([all_1], m_base=7, dc_min=3, dc_max=6) == []
|
|
# With tighter constraints (dc_min=4), all-dv=2 should be removed
|
|
assert filter_by_row_degree([all_2], m_base=7, dc_min=4, dc_max=6) == []
|
|
|
|
def test_optimizer_finds_better_than_original(self):
|
|
"""Optimizer should find a distribution with threshold <= original staircase."""
|
|
from density_evolution import optimize_degree_distribution, compute_threshold_for_profile
|
|
np.random.seed(42)
|
|
results = optimize_degree_distribution(m_base=7, lam_b=0.1, top_k=5, z_pop_coarse=5000, z_pop_fine=10000, tol=0.5)
|
|
assert len(results) > 0, "Optimizer should return at least one result"
|
|
best_degrees, best_threshold = results[0]
|
|
# Original staircase threshold is ~3-5 photons
|
|
assert best_threshold < 6.0, f"Best threshold {best_threshold} should be < 6.0"
|
|
|
|
|
|
class TestPEGBaseMatrixConstructor:
|
|
"""Tests for the PEG base matrix constructor."""
|
|
|
|
def test_construct_matches_target_degrees(self):
|
|
"""Constructed matrix should have the target column degrees."""
|
|
from density_evolution import construct_base_matrix
|
|
np.random.seed(42)
|
|
target = [7, 3, 3, 3, 2, 2, 2, 2]
|
|
H_base, girth = construct_base_matrix(target, z=32, n_trials=500)
|
|
# Check column degrees
|
|
for c in range(H_base.shape[1]):
|
|
actual_deg = np.sum(H_base[:, c] >= 0)
|
|
assert actual_deg == target[c], (
|
|
f"Col {c}: expected degree {target[c]}, got {actual_deg}"
|
|
)
|
|
|
|
def test_construct_has_valid_rank(self):
|
|
"""Full H matrix should have full rank, parity submatrix should too."""
|
|
from density_evolution import construct_base_matrix, verify_matrix
|
|
np.random.seed(42)
|
|
target = [7, 3, 3, 3, 2, 2, 2, 2]
|
|
H_base, girth = construct_base_matrix(target, z=32, n_trials=500)
|
|
checks = verify_matrix(H_base, z=32)
|
|
assert checks['full_rank'], f"Full matrix rank {checks['actual_rank']} < expected {checks['expected_rank']}"
|
|
assert checks['parity_rank'], f"Parity submatrix not full rank"
|
|
|
|
def test_construct_encodable(self):
|
|
"""Encoding a random info word should produce zero syndrome."""
|
|
from density_evolution import construct_base_matrix, verify_matrix
|
|
np.random.seed(42)
|
|
target = [7, 3, 3, 3, 2, 2, 2, 2]
|
|
H_base, girth = construct_base_matrix(target, z=32, n_trials=500)
|
|
checks = verify_matrix(H_base, z=32)
|
|
assert checks['encodable'], "Should be able to encode and verify syndrome=0"
|
|
|
|
def test_construct_girth_at_least_4(self):
|
|
"""Constructed matrix should have girth >= 4."""
|
|
from density_evolution import construct_base_matrix
|
|
np.random.seed(42)
|
|
target = [7, 3, 3, 3, 2, 2, 2, 2]
|
|
H_base, girth = construct_base_matrix(target, z=32, n_trials=500)
|
|
assert girth >= 4, f"Girth {girth} should be >= 4"
|
|
|
|
|
|
class TestFERValidationAndCLI:
|
|
"""Tests for FER validation and CLI."""
|
|
|
|
def test_validate_returns_results(self):
|
|
"""validate_matrix should return FER results dict."""
|
|
from density_evolution import validate_matrix
|
|
from ldpc_sim import H_BASE
|
|
np.random.seed(42)
|
|
results = validate_matrix(H_BASE, lam_s_points=[10.0], n_frames=10, lam_b=0.1)
|
|
assert 10.0 in results, f"Expected key 10.0, got {list(results.keys())}"
|
|
assert 'fer' in results[10.0]
|
|
assert 0.0 <= results[10.0]['fer'] <= 1.0
|
|
|
|
def test_cli_threshold(self):
|
|
"""CLI threshold subcommand should exit 0."""
|
|
import subprocess
|
|
result = subprocess.run(
|
|
['python3', 'model/density_evolution.py', 'threshold', '--z-pop', '5000', '--tol', '1.0'],
|
|
capture_output=True, text=True, timeout=120,
|
|
)
|
|
assert result.returncode == 0, f"CLI failed: {result.stderr}"
|
|
assert 'threshold' in result.stdout.lower() or 'photon' in result.stdout.lower()
|
|
|
|
|
|
class TestNormalizedMinSum:
|
|
"""Tests for normalized min-sum CN update mode."""
|
|
|
|
def test_normalized_minsun_output_smaller(self):
|
|
"""Normalized min-sum should scale magnitude by alpha, not subtract offset."""
|
|
from ldpc_sim import min_sum_cn_update
|
|
# Input: [10, -15, 20] -> min1=10 (idx0), min2=15
|
|
# Offset mode: output magnitudes = max(0, mag - 1)
|
|
# idx0 gets min2=15, so mag=14; idx1,2 get min1=10, so mag=9
|
|
# Normalized mode (alpha=0.75): output magnitudes = floor(mag * 0.75)
|
|
# idx0 gets min2=15, so mag=floor(15*0.75)=11; idx1,2 get min1=10, so mag=floor(10*0.75)=7
|
|
result_norm = min_sum_cn_update([10, -15, 20], cn_mode='normalized', alpha=0.75)
|
|
# idx0: min2=15, floor(15*0.75)=11, sign=XOR(1,0)^0=1^0=1 -> negative? No.
|
|
# signs = [0, 1, 0], sign_xor = 1
|
|
# idx0: ext_sign = 1^0 = 1 -> -11
|
|
# idx1: ext_sign = 1^1 = 0, mag=floor(10*0.75)=7 -> 7
|
|
# idx2: ext_sign = 1^0 = 1, mag=floor(10*0.75)=7 -> -7
|
|
assert result_norm[0] == -11, f"Expected -11, got {result_norm[0]}"
|
|
assert result_norm[1] == 7, f"Expected 7, got {result_norm[1]}"
|
|
assert result_norm[2] == -7, f"Expected -7, got {result_norm[2]}"
|
|
|
|
def test_normalized_decode_converges(self):
|
|
"""Decode a known codeword at lam_s=5 with normalized min-sum."""
|
|
from ldpc_analysis import generic_decode, peg_encode, build_peg_matrix
|
|
from ldpc_sim import poisson_channel, quantize_llr
|
|
np.random.seed(42)
|
|
H_base, H_full = build_peg_matrix(z=32)
|
|
k = 32
|
|
info = np.zeros(k, dtype=np.int8)
|
|
codeword = peg_encode(info, H_base, H_full, z=32)
|
|
llr_float, _ = poisson_channel(codeword, lam_s=5.0, lam_b=0.1)
|
|
llr_q = quantize_llr(llr_float)
|
|
decoded, converged, iters, sw = generic_decode(
|
|
llr_q, H_base, z=32, max_iter=30, cn_mode='normalized', alpha=0.75
|
|
)
|
|
assert converged, f"Normalized min-sum should converge at lam_s=5, sw={sw}"
|
|
assert np.all(decoded == info), f"Decoded bits don't match"
|
|
|
|
def test_offset_mode_unchanged(self):
|
|
"""Default offset mode should produce identical results to before."""
|
|
from ldpc_sim import min_sum_cn_update
|
|
# Test with explicit cn_mode='offset' and without (default)
|
|
msgs = [10, -15, 20, -5]
|
|
result_default = min_sum_cn_update(msgs)
|
|
result_explicit = min_sum_cn_update(msgs, cn_mode='offset')
|
|
assert result_default == result_explicit, (
|
|
f"Default and explicit offset should match: {result_default} vs {result_explicit}"
|
|
)
|