Files
ldpc_optical/model/density_evolution.py
cah 6bffc6cb5f feat: add Z=128 support for matrix construction and validation
Make validate_matrix() and run_full_pipeline() accept z parameter
instead of using hardcoded Z=32. Thread cn_mode/alpha to validation.
Add --z/--cn-mode/--alpha CLI options to full pipeline.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 16:49:55 -07:00

1031 lines
36 KiB
Python

#!/usr/bin/env python3
"""
Density Evolution Optimizer for QC-LDPC Codes
Monte Carlo density evolution to find optimal base matrix degree distributions
that lower the decoding threshold for photon-starved optical communication.
Usage:
python3 density_evolution.py threshold # Compute threshold for known matrices
python3 density_evolution.py optimize # Run optimizer, print top-10
python3 density_evolution.py construct DEGREES # Build matrix for given degrees
python3 density_evolution.py validate # FER comparison: optimized vs original
python3 density_evolution.py full # Full pipeline
"""
import numpy as np
import argparse
import json
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from ldpc_sim import (
Q_BITS, Q_MAX, Q_MIN, OFFSET, Z, N_BASE, M_BASE, H_BASE,
build_full_h_matrix,
)
# =============================================================================
# H_base profile representation for density evolution
# =============================================================================
# A "profile" is a dict with:
# 'n_base': int - number of columns
# 'm_base': int - number of rows
# 'connections': list of lists - connections[row] = list of connected column indices
# 'vn_degrees': list of int - degree of each column
#
# For DE, we don't need circulant shifts - only the connectivity pattern matters.
def make_profile(H_base):
"""Convert a base matrix to a DE profile."""
m_base = H_base.shape[0]
n_base = H_base.shape[1]
connections = []
for r in range(m_base):
row_cols = [c for c in range(n_base) if H_base[r, c] >= 0]
connections.append(row_cols)
vn_degrees = []
for c in range(n_base):
vn_degrees.append(sum(1 for r in range(m_base) if H_base[r, c] >= 0))
return {
'n_base': n_base,
'm_base': m_base,
'connections': connections,
'vn_degrees': vn_degrees,
}
ORIGINAL_STAIRCASE_PROFILE = make_profile(H_BASE)
# =============================================================================
# Monte Carlo Density Evolution Engine
# =============================================================================
def de_channel_init(profile, z_pop, lam_s, lam_b):
"""
Generate initial LLR population for each VN column group.
Returns:
beliefs: ndarray (n_base, z_pop) - LLR beliefs per column group
msg_memory: dict mapping (row, col_idx_in_row) -> ndarray(z_pop) of CN->VN messages
"""
n_base = profile['n_base']
m_base = profile['m_base']
# For each column, generate z_pop independent channel observations
# All columns transmit bit=0 (all-zeros codeword) for DE
# P(y|0): photon count ~ Poisson(lam_b)
# LLR = lam_s - y * log((lam_s + lam_b) / lam_b)
beliefs = np.zeros((n_base, z_pop), dtype=np.float64)
log_ratio = np.log((lam_s + lam_b) / lam_b) if lam_b > 0 else 100.0
for c in range(n_base):
# Channel observation: all-zero codeword, so photons ~ Poisson(lam_b)
y = np.random.poisson(lam_b, size=z_pop)
llr_float = lam_s - y * log_ratio
# Quantize to 6-bit signed
scale = Q_MAX / 5.0
llr_q = np.round(llr_float * scale).astype(np.int32)
llr_q = np.clip(llr_q, Q_MIN, Q_MAX)
beliefs[c] = llr_q.astype(np.float64)
# Initialize CN->VN message memory to zero
msg_memory = {}
for r in range(m_base):
for ci, c in enumerate(profile['connections'][r]):
msg_memory[(r, c)] = np.zeros(z_pop, dtype=np.float64)
return beliefs, msg_memory
def de_cn_update_vectorized(vn_msgs_list, offset=OFFSET, cn_mode='offset', alpha=0.75):
"""
Vectorized min-sum CN update for a batch of z_pop messages.
Args:
vn_msgs_list: list of ndarrays, each (z_pop,) - one per connected VN
offset: min-sum offset (used in offset mode)
cn_mode: 'offset' or 'normalized'
alpha: scaling factor for normalized mode
Returns:
cn_out_list: list of ndarrays, each (z_pop,) - CN->VN messages
"""
dc = len(vn_msgs_list)
if dc < 2:
return [np.zeros_like(vn_msgs_list[0])] if dc == 1 else []
z_pop = len(vn_msgs_list[0])
# Stack into (dc, z_pop) array
msgs = np.array(vn_msgs_list, dtype=np.float64)
# Signs and magnitudes
signs = (msgs < 0).astype(np.int8) # (dc, z_pop)
mags = np.abs(msgs) # (dc, z_pop)
# Total sign XOR across all inputs
sign_xor = np.sum(signs, axis=0) % 2 # (z_pop,)
# Find min1 and min2 magnitudes
# Sort magnitudes along dc axis, take smallest two
sorted_mags = np.sort(mags, axis=0) # (dc, z_pop)
min1 = sorted_mags[0] # (z_pop,)
min2 = sorted_mags[1] # (z_pop,)
# For each output j: magnitude = min2 if j has min1, else min1
# Find which input has the minimum magnitude
min1_mask = (mags == min1[np.newaxis, :]) # (dc, z_pop)
# Break ties: only first occurrence gets min2
# Use cumsum trick to mark only the first True per column
first_min = min1_mask & (np.cumsum(min1_mask, axis=0) == 1)
cn_out_list = []
for j in range(dc):
mag = np.where(first_min[j], min2, min1)
if cn_mode == 'normalized':
mag = np.floor(mag * alpha).astype(mag.dtype)
else:
mag = np.maximum(0, mag - offset)
# Extrinsic sign: total XOR minus this input's sign
ext_sign = sign_xor ^ signs[j]
val = np.where(ext_sign, -mag, mag)
# Quantize
val = np.clip(np.round(val), Q_MIN, Q_MAX)
cn_out_list.append(val)
return cn_out_list
def density_evolution_step(beliefs, msg_memory, profile, z_pop,
cn_mode='offset', alpha=0.75):
"""
One full DE iteration: process all rows (layers) of the base matrix.
For DE, we randomly permute within each column group to simulate
the random interleaving effect (circulant shifts are random for DE).
Args:
beliefs: ndarray (n_base, z_pop) - current VN beliefs
msg_memory: dict (row, col) -> ndarray(z_pop) of old CN->VN messages
profile: DE profile dict
z_pop: population size
cn_mode: 'offset' or 'normalized'
alpha: scaling factor for normalized mode
Returns:
beliefs: updated beliefs (modified in-place and returned)
"""
m_base = profile['m_base']
for row in range(m_base):
connected_cols = profile['connections'][row]
dc = len(connected_cols)
if dc < 2:
continue
# Step 1: Compute VN->CN messages (subtract old CN->VN)
# Randomly permute within each column group for DE
vn_to_cn = []
permuted_indices = []
for ci, col in enumerate(connected_cols):
perm = np.random.permutation(z_pop)
permuted_indices.append(perm)
old_msg = msg_memory[(row, col)]
# VN->CN = belief[col][permuted] - old_CN->VN
vn_msg = beliefs[col][perm] - old_msg
# Saturate
vn_msg = np.clip(np.round(vn_msg), Q_MIN, Q_MAX)
vn_to_cn.append(vn_msg)
# Step 2: CN update (vectorized min-sum)
cn_to_vn = de_cn_update_vectorized(vn_to_cn, offset=OFFSET,
cn_mode=cn_mode, alpha=alpha)
# Step 3: Update beliefs and store new messages
for ci, col in enumerate(connected_cols):
perm = permuted_indices[ci]
new_msg = cn_to_vn[ci]
extrinsic = vn_to_cn[ci]
# New belief = extrinsic + new CN->VN
new_belief = extrinsic + new_msg
new_belief = np.clip(np.round(new_belief), Q_MIN, Q_MAX)
# Write back to beliefs at permuted positions
beliefs[col][perm] = new_belief
# Store new CN->VN message
msg_memory[(row, col)] = new_msg
return beliefs
def run_de(profile, lam_s, lam_b, z_pop=100000, max_iter=100,
cn_mode='offset', alpha=0.75):
"""
Run full density evolution simulation.
Args:
profile: DE profile dict
lam_s: signal photons per slot
lam_b: background photons per slot
z_pop: population size
max_iter: maximum iterations
cn_mode: 'offset' or 'normalized'
alpha: scaling factor for normalized mode
Returns:
(converged: bool, avg_error_frac: float)
Convergence means fraction of wrong-sign beliefs < 1e-4.
"""
beliefs, msg_memory = de_channel_init(profile, z_pop, lam_s, lam_b)
for it in range(max_iter):
beliefs = density_evolution_step(beliefs, msg_memory, profile, z_pop,
cn_mode=cn_mode, alpha=alpha)
# Check convergence: for all-zeros codeword, correct belief is positive
# Error = fraction of beliefs with wrong sign (negative)
total_wrong = 0
total_count = 0
for c in range(profile['n_base']):
total_wrong += np.sum(beliefs[c] < 0)
total_count += z_pop
error_frac = total_wrong / total_count
if error_frac < 1e-4:
return True, error_frac
return False, error_frac
# =============================================================================
# Threshold Computation via Binary Search
# =============================================================================
def build_de_profile(vn_degrees, m_base=7):
"""
Build a DE profile from a VN degree list.
Col 0 is always info (connects to all m_base rows).
Cols 1..n_base-1 are parity columns with staircase backbone
plus extra connections to reach target degrees.
Extra connections are placed below-diagonal to preserve
lower-triangular parity structure.
"""
n_base = len(vn_degrees)
assert n_base == m_base + 1, f"Expected {m_base+1} columns, got {n_base}"
assert vn_degrees[0] == m_base, f"Info column must have degree {m_base}"
# Start with staircase backbone connections
# connections[row] = set of connected columns
connections = [set() for _ in range(m_base)]
# Col 0 (info) connects to all rows
for r in range(m_base):
connections[r].add(0)
# Staircase backbone: row r connects to col r+1 (diagonal)
# row r>0 also connects to col r (sub-diagonal)
for r in range(m_base):
connections[r].add(r + 1) # diagonal
if r > 0:
connections[r].add(r) # sub-diagonal
# Check current degrees
current_degrees = [0] * n_base
for r in range(m_base):
for c in connections[r]:
current_degrees[c] += 1
# Add extra connections for parity columns that need higher degree
for c in range(1, n_base):
needed = vn_degrees[c] - current_degrees[c]
if needed <= 0:
continue
# Find rows we can add this column to (not already connected)
# Prefer below-diagonal rows to preserve lower-triangular structure
available_rows = []
for r in range(m_base):
if c not in connections[r]:
available_rows.append(r)
# Sort by distance below diagonal (prefer far below for structure)
# Diagonal for col c is row c-1
available_rows.sort(key=lambda r: -(r - (c - 1)) % m_base)
for r in available_rows[:needed]:
connections[r].add(c)
current_degrees[c] += 1
# Convert sets to sorted lists
connections_list = [sorted(conns) for conns in connections]
# Verify degrees
final_degrees = [0] * n_base
for r in range(m_base):
for c in connections_list[r]:
final_degrees[c] += 1
return {
'n_base': n_base,
'm_base': m_base,
'connections': connections_list,
'vn_degrees': final_degrees,
}
def compute_threshold(profile, lam_b=0.1, z_pop=50000, tol=0.1,
cn_mode='offset', alpha=0.75):
"""
Binary search for minimum lam_s where DE converges.
Returns threshold lam_s*.
"""
lo = 0.1
hi = 20.0
# Verify hi converges
converged, _ = run_de(profile, lam_s=hi, lam_b=lam_b, z_pop=z_pop,
max_iter=100, cn_mode=cn_mode, alpha=alpha)
if not converged:
return hi # Doesn't converge even at hi
# Verify lo doesn't converge
converged, _ = run_de(profile, lam_s=lo, lam_b=lam_b, z_pop=z_pop,
max_iter=100, cn_mode=cn_mode, alpha=alpha)
if converged:
return lo # Converges even at lo
while hi - lo > tol:
mid = (lo + hi) / 2
converged, _ = run_de(profile, lam_s=mid, lam_b=lam_b, z_pop=z_pop,
max_iter=100, cn_mode=cn_mode, alpha=alpha)
if converged:
hi = mid
else:
lo = mid
return hi
def compute_threshold_for_profile(vn_degrees, m_base=7, lam_b=0.1, z_pop=50000, tol=0.1,
cn_mode='offset', alpha=0.75):
"""
Convenience wrapper: compute DE threshold from a VN degree list.
"""
profile = build_de_profile(vn_degrees, m_base=m_base)
return compute_threshold(profile, lam_b=lam_b, z_pop=z_pop, tol=tol,
cn_mode=cn_mode, alpha=alpha)
# =============================================================================
# Degree Distribution Optimizer
# =============================================================================
def enumerate_vn_candidates(m_base=7):
"""
Enumerate all VN degree distributions for parity columns.
Col 0 is always dv=m_base. Parity cols 1..m_base each have dv in {2, 3, 4}.
Returns list of degree vectors (length m_base+1).
"""
from itertools import product
candidates = []
for combo in product([2, 3, 4], repeat=m_base):
degrees = [m_base] + list(combo)
candidates.append(degrees)
return candidates
def filter_by_row_degree(candidates, m_base=7, dc_min=3, dc_max=6):
"""
Filter candidates by row degree constraints.
For a valid distribution, the total edges must be distributable such that
each row has degree in [dc_min, dc_max].
For our structure: info col contributes 1 edge to each row (m_base total).
Parity edges must distribute to give each row dc in [dc_min, dc_max].
"""
filtered = []
for degrees in candidates:
n_base = len(degrees)
# Total parity edges = sum of parity column degrees
parity_edges = sum(degrees[1:])
# Info col contributes 1 edge per row
# So total edges per row = 1 (from info) + parity edges assigned to that row
# Total parity edges must be distributable: each row gets (dc - 1) parity edges
# where dc_min <= dc <= dc_max
# So: m_base * (dc_min - 1) <= parity_edges <= m_base * (dc_max - 1)
min_parity = m_base * (dc_min - 1)
max_parity = m_base * (dc_max - 1)
if min_parity <= parity_edges <= max_parity:
filtered.append(degrees)
return filtered
def coarse_screen(candidates, lam_s_test, lam_b, z_pop, max_iter, m_base=7):
"""
Quick convergence test: run DE at a test point, keep candidates that converge.
"""
survivors = []
for degrees in candidates:
profile = build_de_profile(degrees, m_base=m_base)
converged, error_frac = run_de(
profile, lam_s=lam_s_test, lam_b=lam_b,
z_pop=z_pop, max_iter=max_iter
)
if converged:
survivors.append(degrees)
return survivors
def get_unique_distributions(candidates):
"""
Group candidates by sorted parity degree sequence.
For DE, only the degree distribution matters, not which column has
which degree. Returns list of representative degree vectors (one per
unique distribution), with parity degrees sorted descending.
"""
seen = set()
unique = []
for degrees in candidates:
# Sort parity degrees descending for canonical form
parity_sorted = tuple(sorted(degrees[1:], reverse=True))
if parity_sorted not in seen:
seen.add(parity_sorted)
# Use canonical form: info degree + sorted parity
unique.append([degrees[0]] + list(parity_sorted))
return unique
def optimize_degree_distribution(m_base=7, lam_b=0.1, top_k=10,
z_pop_coarse=10000, z_pop_fine=50000,
tol=0.1):
"""
Full optimization pipeline: enumerate, filter, coarse screen, fine threshold.
Key optimization: for DE, only the degree distribution matters (not column
ordering), so we group 2187 candidates into ~36 unique distributions.
Returns list of (vn_degrees, threshold) sorted by threshold ascending.
"""
print("Step 1: Enumerating candidates...")
candidates = enumerate_vn_candidates(m_base=m_base)
print(f" {len(candidates)} total candidates")
print("Step 2: Filtering by row degree constraints...")
filtered = filter_by_row_degree(candidates, m_base=m_base, dc_min=3, dc_max=6)
print(f" {len(filtered)} candidates after filtering")
print("Step 3: Grouping by unique degree distribution...")
unique = get_unique_distributions(filtered)
print(f" {len(unique)} unique distributions")
print("Step 4: Coarse screening at lam_s=2.0...")
survivors = coarse_screen(
unique, lam_s_test=2.0, lam_b=lam_b,
z_pop=z_pop_coarse, max_iter=50, m_base=m_base
)
print(f" {len(survivors)} survivors after coarse screen")
if not survivors:
print(" No survivors at lam_s=2.0, trying lam_s=3.0...")
survivors = coarse_screen(
unique, lam_s_test=3.0, lam_b=lam_b,
z_pop=z_pop_coarse, max_iter=50, m_base=m_base
)
print(f" {len(survivors)} survivors at lam_s=3.0")
if not survivors:
print(" No survivors found, returning empty list")
return []
print(f"Step 5: Fine threshold computation for {len(survivors)} survivors...")
results = []
for i, degrees in enumerate(survivors):
profile = build_de_profile(degrees, m_base=m_base)
threshold = compute_threshold(profile, lam_b=lam_b, z_pop=z_pop_fine, tol=tol)
results.append((degrees, threshold))
if (i + 1) % 5 == 0:
print(f" {i+1}/{len(survivors)} done...")
# Sort by threshold ascending
results.sort(key=lambda x: x[1])
print(f"\nTop-{min(top_k, len(results))} degree distributions:")
for i, (degrees, threshold) in enumerate(results[:top_k]):
print(f" #{i+1}: {degrees} -> threshold = {threshold:.2f} photons/slot")
return results[:top_k]
# =============================================================================
# Alpha Optimization for Normalized Min-Sum
# =============================================================================
def optimize_alpha(vn_degrees, alpha_range=None, m_base=7, lam_b=0.1,
z_pop=20000, tol=0.25):
"""
Sweep alpha values for normalized min-sum and find the best threshold.
Args:
vn_degrees: VN degree list
alpha_range: list of alpha values to try (hardware-friendly fractions)
m_base: number of base matrix rows
lam_b: background photon rate
z_pop: DE population size
tol: threshold tolerance
Returns:
(best_alpha, best_threshold)
"""
if alpha_range is None:
alpha_range = [0.5, 0.625, 0.6875, 0.75, 0.8125, 0.875, 1.0]
profile = build_de_profile(vn_degrees, m_base=m_base)
best_alpha, best_threshold = None, float('inf')
for alpha in alpha_range:
t = compute_threshold(profile, lam_b=lam_b, z_pop=z_pop, tol=tol,
cn_mode='normalized', alpha=alpha)
print(f" alpha={alpha:.4f}: threshold = {t:.2f} photons/slot")
if t < best_threshold:
best_alpha, best_threshold = alpha, t
return best_alpha, best_threshold
# =============================================================================
# PEG Base Matrix Constructor
# =============================================================================
def _build_full_h_from_base(H_base, z=32):
"""Expand a QC base matrix to a full binary parity-check matrix."""
m_base_local = H_base.shape[0]
n_base_local = H_base.shape[1]
m_full = m_base_local * z
n_full = n_base_local * z
H_full = np.zeros((m_full, n_full), dtype=np.int8)
for r in range(m_base_local):
for c in range(n_base_local):
shift = int(H_base[r, c])
if shift < 0:
continue
for zz in range(z):
col_idx = c * z + (zz + shift) % z
H_full[r * z + zz, col_idx] = 1
return H_full
def _generate_random_pattern(vn_degrees, m_base):
"""
Generate a random connection pattern matching target VN degrees.
Keeps the staircase backbone fixed and randomly places extra connections
among available positions.
"""
n_base = len(vn_degrees)
pattern = -np.ones((m_base, n_base), dtype=np.int16)
# Col 0 (info) connects to all rows
for r in range(m_base):
pattern[r, 0] = 0
# Staircase backbone
for r in range(m_base):
pattern[r, r + 1] = 0
for r in range(1, m_base):
pattern[r, r] = 0
# Current degrees from backbone
current_degrees = [0] * n_base
for r in range(m_base):
for c in range(n_base):
if pattern[r, c] >= 0:
current_degrees[c] += 1
# Randomly place extra connections
for c in range(1, n_base):
needed = vn_degrees[c] - current_degrees[c]
if needed <= 0:
continue
available = [r for r in range(m_base) if pattern[r, c] < 0]
if len(available) < needed:
continue
chosen = np.random.choice(available, size=needed, replace=False)
for r in chosen:
pattern[r, c] = 0
return pattern
def construct_base_matrix(vn_degrees, z=32, n_trials=1000):
"""
Construct a concrete base matrix with circulant shifts optimized
for maximum girth and guaranteed full rank.
Algorithm:
1. Try multiple random connection placements (extras beyond staircase)
2. For each placement, try random shift assignments
3. Keep the best (full-rank, highest girth) result
Returns (H_base, girth).
"""
from ldpc_analysis import compute_girth
m_base = len(vn_degrees) - 1
n_base = len(vn_degrees)
assert vn_degrees[0] == m_base
expected_rank = m_base * z
best_H = None
best_girth = 0
best_has_rank = False
# Divide trials between placement variations and shift optimization
n_patterns = max(10, n_trials // 20)
n_shifts_per_pattern = max(20, n_trials // n_patterns)
for _p in range(n_patterns):
pattern = _generate_random_pattern(vn_degrees, m_base)
positions = [(r, c) for r in range(m_base) for c in range(n_base)
if pattern[r, c] >= 0]
for _s in range(n_shifts_per_pattern):
H_candidate = pattern.copy()
for r, c in positions:
H_candidate[r, c] = np.random.randint(0, z)
# Check rank
H_full = _build_full_h_from_base(H_candidate, z)
rank = np.linalg.matrix_rank(H_full.astype(float))
has_full_rank = rank >= expected_rank
if has_full_rank and not best_has_rank:
girth = compute_girth(H_candidate, z)
best_girth = girth
best_H = H_candidate.copy()
best_has_rank = True
elif has_full_rank and best_has_rank:
girth = compute_girth(H_candidate, z)
if girth > best_girth:
best_girth = girth
best_H = H_candidate.copy()
elif not best_has_rank:
girth = compute_girth(H_candidate, z)
if best_H is None or girth > best_girth:
best_girth = girth
best_H = H_candidate.copy()
if best_has_rank and best_girth >= 8:
return best_H, best_girth
if best_H is None:
# Fallback: return pattern with zero shifts
best_H = _generate_random_pattern(vn_degrees, m_base)
best_girth = compute_girth(best_H, z)
return best_H, best_girth
def verify_matrix(H_base, z=32):
"""
Comprehensive validity checks for a base matrix.
Returns dict with check results.
"""
from ldpc_analysis import compute_girth, peg_encode
m_base_local = H_base.shape[0]
n_base_local = H_base.shape[1]
k = z
# Build full matrix
H_full = _build_full_h_from_base(H_base, z)
# Column degrees
col_degrees = []
for c in range(n_base_local):
col_degrees.append(int(np.sum(H_base[:, c] >= 0)))
# Full rank check
expected_rank = m_base_local * z
actual_rank = np.linalg.matrix_rank(H_full.astype(float))
full_rank = actual_rank >= expected_rank
# Parity submatrix rank (columns k onwards)
H_parity = H_full[:, k:]
parity_rank = np.linalg.matrix_rank(H_parity.astype(float))
parity_full_rank = parity_rank >= min(H_parity.shape)
# Girth
girth = compute_girth(H_base, z)
# Encoding test
encodable = False
try:
info = np.random.randint(0, 2, k).astype(np.int8)
codeword = peg_encode(info, H_base, H_full, z=z)
syndrome = H_full @ codeword % 2
encodable = np.all(syndrome == 0)
except Exception:
encodable = False
return {
'col_degrees': col_degrees,
'full_rank': full_rank,
'actual_rank': actual_rank,
'expected_rank': expected_rank,
'parity_rank': parity_full_rank,
'girth': girth,
'encodable': encodable,
}
# =============================================================================
# FER Validation
# =============================================================================
def validate_matrix(H_base, lam_s_points, n_frames=200, lam_b=0.1, max_iter=30,
z=32, cn_mode='offset', alpha=0.75):
"""
Run full Monte Carlo FER simulation for a base matrix.
Returns dict mapping lam_s -> {fer, ber, avg_iter}.
"""
from ldpc_analysis import generic_decode, peg_encode
k = z
H_full = _build_full_h_from_base(H_base, z)
results = {}
for lam_s in lam_s_points:
bit_errors = 0
frame_errors = 0
total_bits = 0
total_iter = 0
for _ in range(n_frames):
info = np.random.randint(0, 2, k).astype(np.int8)
try:
codeword = peg_encode(info, H_base, H_full, z=z)
except Exception:
frame_errors += 1
total_bits += k
continue
from ldpc_sim import poisson_channel, quantize_llr
llr_float, _ = poisson_channel(codeword, lam_s, lam_b)
llr_q = quantize_llr(llr_float)
decoded, converged, iters, sw = generic_decode(
llr_q, H_base, z=z, max_iter=max_iter, q_bits=Q_BITS,
cn_mode=cn_mode, alpha=alpha
)
total_iter += iters
errs = np.sum(decoded != info)
bit_errors += errs
total_bits += k
if errs > 0:
frame_errors += 1
ber = bit_errors / total_bits if total_bits > 0 else 0
fer = frame_errors / n_frames
avg_iter = total_iter / n_frames
results[lam_s] = {
'fer': float(fer),
'ber': float(ber),
'avg_iter': float(avg_iter),
}
return results
def run_full_pipeline(lam_b=0.1, seed=42, z=32, cn_mode='offset', alpha=0.75):
"""
Full pipeline: optimize -> construct -> validate.
"""
np.random.seed(seed)
print("=" * 70)
print("DENSITY EVOLUTION FULL PIPELINE")
print("=" * 70)
# Step 1: Compute threshold for reference matrices
print("\n--- Step 1: Reference thresholds ---")
from ldpc_analysis import build_peg_matrix
ref_matrices = {
'Original staircase [7,2,2,2,2,2,2,1]': H_BASE,
'PEG ring [7,3,3,3,2,2,2,2]': build_peg_matrix(z=32)[0],
}
ref_thresholds = {}
for name, H_b in ref_matrices.items():
profile = make_profile(H_b)
threshold = compute_threshold(profile, lam_b=lam_b, z_pop=20000, tol=0.25)
ref_thresholds[name] = threshold
print(f" {name}: threshold = {threshold:.2f} photons/slot")
# Step 2: Run optimizer
print("\n--- Step 2: Degree distribution optimization ---")
opt_results = optimize_degree_distribution(
m_base=7, lam_b=lam_b, top_k=5,
z_pop_coarse=10000, z_pop_fine=20000, tol=0.25
)
if not opt_results:
print("ERROR: No valid distributions found")
return None
best_degrees, best_threshold = opt_results[0]
print(f"\nBest distribution: {best_degrees}")
print(f"Best threshold: {best_threshold:.2f} photons/slot")
# Step 3: Construct matrix
print("\n--- Step 3: Matrix construction ---")
H_opt, girth = construct_base_matrix(best_degrees, z=z, n_trials=2000)
checks = verify_matrix(H_opt, z=z)
print(f" Constructed matrix: girth={girth}, rank={checks['actual_rank']}/{checks['expected_rank']}")
print(f" Encodable: {checks['encodable']}, Full rank: {checks['full_rank']}")
print(f" Base matrix:\n{H_opt}")
# Step 4: FER validation
print("\n--- Step 4: FER validation ---")
lam_s_points = [2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
print(f"\n{'lam_s':>8s} {'Optimized':>12s} {'Original':>12s} {'PEG ring':>12s}")
print("-" * 50)
fer_opt = validate_matrix(H_opt, lam_s_points, n_frames=200, lam_b=lam_b,
z=z, cn_mode=cn_mode, alpha=alpha)
fer_orig = validate_matrix(H_BASE, lam_s_points, n_frames=200, lam_b=lam_b,
z=z, cn_mode=cn_mode, alpha=alpha)
fer_peg = validate_matrix(ref_matrices['PEG ring [7,3,3,3,2,2,2,2]'],
lam_s_points, n_frames=200, lam_b=lam_b,
z=z, cn_mode=cn_mode, alpha=alpha)
for lam_s in lam_s_points:
f_opt = fer_opt[lam_s]['fer']
f_orig = fer_orig[lam_s]['fer']
f_peg = fer_peg[lam_s]['fer']
print(f"{lam_s:8.1f} {f_opt:12.3f} {f_orig:12.3f} {f_peg:12.3f}")
# Save results
output = {
'reference_thresholds': ref_thresholds,
'optimizer_results': [(d, t) for d, t in opt_results],
'best_degrees': best_degrees,
'best_threshold': float(best_threshold),
'constructed_matrix': H_opt.tolist(),
'matrix_checks': {k: (v if not isinstance(v, (np.integer, np.floating, np.bool_)) else v.item())
for k, v in checks.items()},
'fer_comparison': {
'lam_s_points': lam_s_points,
'optimized': {str(k): v for k, v in fer_opt.items()},
'original': {str(k): v for k, v in fer_orig.items()},
'peg_ring': {str(k): v for k, v in fer_peg.items()},
},
}
out_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data')
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, 'de_results.json')
with open(out_path, 'w') as f:
json.dump(output, f, indent=2, default=str)
print(f"\nResults saved to {out_path}")
return output
# =============================================================================
# CLI
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description='Density Evolution Optimizer for QC-LDPC Codes',
)
subparsers = parser.add_subparsers(dest='command')
# threshold
p_thresh = subparsers.add_parser('threshold', help='Compute threshold for known matrices')
p_thresh.add_argument('--z-pop', type=int, default=20000)
p_thresh.add_argument('--tol', type=float, default=0.25)
p_thresh.add_argument('--seed', type=int, default=42)
# optimize
p_opt = subparsers.add_parser('optimize', help='Run optimizer, print top-10')
p_opt.add_argument('--top-k', type=int, default=10)
p_opt.add_argument('--seed', type=int, default=42)
# construct
p_con = subparsers.add_parser('construct', help='Build matrix for given degrees')
p_con.add_argument('degrees', type=str, help='Comma-separated degrees, e.g. 7,3,3,3,2,2,2,2')
p_con.add_argument('--n-trials', type=int, default=2000)
p_con.add_argument('--seed', type=int, default=42)
# validate
p_val = subparsers.add_parser('validate', help='FER comparison')
p_val.add_argument('--n-frames', type=int, default=200)
p_val.add_argument('--seed', type=int, default=42)
# alpha-sweep
p_alpha = subparsers.add_parser('alpha-sweep', help='Optimize alpha for normalized min-sum')
p_alpha.add_argument('--degrees', type=str, default='7,4,4,4,4,3,3,3',
help='Comma-separated VN degrees')
p_alpha.add_argument('--z-pop', type=int, default=20000)
p_alpha.add_argument('--tol', type=float, default=0.25)
p_alpha.add_argument('--seed', type=int, default=42)
# full
p_full = subparsers.add_parser('full', help='Full pipeline')
p_full.add_argument('--seed', type=int, default=42)
p_full.add_argument('--z', type=int, default=32, help='Lifting factor')
p_full.add_argument('--cn-mode', type=str, default='offset',
choices=['offset', 'normalized'])
p_full.add_argument('--alpha', type=float, default=0.75)
args = parser.parse_args()
if args.command == 'alpha-sweep':
np.random.seed(args.seed)
degrees = [int(x) for x in args.degrees.split(',')]
print(f"Alpha Optimization for degrees {degrees}")
print("-" * 50)
best_alpha, best_threshold = optimize_alpha(
degrees, m_base=len(degrees) - 1, lam_b=0.1,
z_pop=args.z_pop, tol=args.tol
)
print(f"\nBest alpha: {best_alpha:.4f}")
print(f"Best threshold: {best_threshold:.2f} photons/slot")
# Compare with offset mode
profile = build_de_profile(degrees, m_base=len(degrees) - 1)
thresh_offset = compute_threshold(profile, lam_b=0.1, z_pop=args.z_pop,
tol=args.tol, cn_mode='offset')
print(f"Offset min-sum threshold: {thresh_offset:.2f} photons/slot")
if best_threshold < thresh_offset:
gain_db = 10 * np.log10(thresh_offset / best_threshold)
print(f"Normalized gain: {gain_db:.2f} dB")
elif args.command == 'threshold':
np.random.seed(args.seed)
from ldpc_analysis import build_peg_matrix
matrices = {
'Original staircase [7,2,2,2,2,2,2,1]': H_BASE,
'PEG ring [7,3,3,3,2,2,2,2]': build_peg_matrix(z=32)[0],
}
print("DE Threshold Computation")
print("-" * 50)
for name, H_b in matrices.items():
profile = make_profile(H_b)
threshold = compute_threshold(profile, lam_b=0.1, z_pop=args.z_pop, tol=args.tol)
print(f" {name}: {threshold:.2f} photons/slot")
elif args.command == 'optimize':
np.random.seed(args.seed)
optimize_degree_distribution(m_base=7, lam_b=0.1, top_k=args.top_k)
elif args.command == 'construct':
np.random.seed(args.seed)
degrees = [int(x) for x in args.degrees.split(',')]
H_base, girth = construct_base_matrix(degrees, z=32, n_trials=args.n_trials)
checks = verify_matrix(H_base, z=32)
print(f"Constructed matrix for degrees {degrees}:")
print(f" Girth: {girth}")
print(f" Rank: {checks['actual_rank']}/{checks['expected_rank']}")
print(f" Encodable: {checks['encodable']}")
print(f"Base matrix:")
print(H_base)
elif args.command == 'validate':
np.random.seed(args.seed)
lam_s_points = [2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
print("FER Validation: Original staircase")
results = validate_matrix(H_BASE, lam_s_points, n_frames=args.n_frames)
for lam_s in lam_s_points:
r = results[lam_s]
print(f" lam_s={lam_s:.1f}: FER={r['fer']:.3f}, BER={r['ber']:.6f}")
elif args.command == 'full':
run_full_pipeline(seed=args.seed, z=args.z, cn_mode=args.cn_mode, alpha=args.alpha)
else:
parser.print_help()
if __name__ == '__main__':
main()