Files
ldpc_optical/model/ldpc_sim.py
cah b7b76da46e Fix encoder and decoder - working LDPC simulation
- Fixed cyclic shift convention (QC-LDPC P_s is left shift, not right)
- Fixed encoder to solve rows sequentially (row 0 first for p1, then 1-6)
- Fixed decoder to only gather connected columns per CN (staircase has dc=2-3)
- Fixed LLR sign convention: positive = bit 0 more likely
- Decoder validates at lam_s >= 4 photons/slot (~90% frame success)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 21:56:15 -07:00

494 lines
17 KiB
Python

#!/usr/bin/env python3
"""
LDPC Decoder Behavioral Model - Bit-Exact Reference for RTL Verification
Implements offset min-sum decoding with layered scheduling for a
rate-1/8 QC-LDPC code (n=256, k=32, Z=32, base matrix 7x8).
Channel model: Poisson photon-counting (optical communication)
Usage:
python3 ldpc_sim.py # Run BER simulation
python3 ldpc_sim.py --gen-vectors # Generate RTL test vectors
python3 ldpc_sim.py --sweep-snr # SNR sweep for BER curve
"""
import numpy as np
import argparse
import json
import os
# =============================================================================
# Code parameters
# =============================================================================
N_BASE = 8 # base matrix columns
M_BASE = 7 # base matrix rows
Z = 32 # lifting factor
N = N_BASE * Z # 256 codeword bits
K = Z # 32 info bits (rate 1/8)
M = M_BASE * Z # 224 parity checks
Q_BITS = 6 # quantization bits (signed)
Q_MAX = 2**(Q_BITS-1) - 1 # +31
Q_MIN = -(2**(Q_BITS-1)) # -32
OFFSET = 1 # min-sum offset (integer)
# Base matrix: H_BASE[row][col] = cyclic shift, -1 = no connection
# This must match the RTL exactly!
#
# Structure: IRA (Irregular Repeat-Accumulate) with staircase parity
# Column 0: information bits, connected to ALL 7 rows (dv=7, high protection)
# Columns 1-7: parity bits with lower-triangular staircase (dv=2, except col7=1)
#
# Encoding: purely sequential (lower triangular parity submatrix)
# Row 0 → solve p1; Row 1 → solve p2 (using p1); ... Row 6 → solve p7
#
# VN degree distribution: col0=7, cols1-6=2, col7=1
# Average VN degree = (7*32 + 2*192 + 1*32) / 256 = 2.5
H_BASE = np.array([
[ 0, 5, -1, -1, -1, -1, -1, -1], # row 0: info(0) + p1(5)
[11, 3, 0, -1, -1, -1, -1, -1], # row 1: info(11) + p1(3) + p2(0)
[17, -1, 7, 0, -1, -1, -1, -1], # row 2: info(17) + p2(7) + p3(0)
[23, -1, -1, 13, 0, -1, -1, -1], # row 3: info(23) + p3(13) + p4(0)
[29, -1, -1, -1, 19, 0, -1, -1], # row 4: info(29) + p4(19) + p5(0)
[ 3, -1, -1, -1, -1, 25, 0, -1], # row 5: info(3) + p5(25) + p6(0)
[ 9, -1, -1, -1, -1, -1, 31, 0], # row 6: info(9) + p6(31) + p7(0)
], dtype=np.int16)
def build_full_h_matrix():
"""Expand QC base matrix to full binary parity-check matrix H (M x N)."""
H = np.zeros((M, N), dtype=np.int8)
for r in range(M_BASE):
for c in range(N_BASE):
shift = int(H_BASE[r, c])
if shift < 0:
continue # null sub-matrix
# Cyclic permutation matrix of size Z with shift
for z in range(Z):
col_idx = int(c * Z + (z + shift) % Z)
H[r * Z + z, col_idx] = 1
return H
def cyclic_shift(vec, shift):
"""Cyclic right-shift a Z-length vector by 'shift' positions."""
s = int(shift) % len(vec)
if s == 0:
return vec.copy()
return np.concatenate([vec[-s:], vec[:-s]])
def ldpc_encode(info_bits, H):
"""
Sequential encoding exploiting the IRA staircase structure.
QC-LDPC convention: H_BASE[r][c] = s means sub-block is circulant P_s
where P_s applied to vector x gives: result[i] = x[(i+s) % Z] (left shift by s).
Lower-triangular parity submatrix allows top-to-bottom sequential solve:
Row 0: P_{h00}*info + P_{h01}*p1 = 0 -> p1 = P_{h01}^-1 * P_{h00} * info
Row r: P_{hr0}*info + P_{hrr}*p[r] + P_{hr,r+1}*p[r+1] = 0
-> p[r+1] = P_{hr,r+1}^-1 * (P_{hr0}*info + P_{hrr}*p[r])
P_s * x = np.roll(x, -s) (left circular shift)
P_s^-1*y = np.roll(y, +s) (right circular shift)
"""
assert len(info_bits) == K
def apply_p(x, s):
"""Apply circulant P_s: result[i] = x[(i+s)%Z]."""
return np.roll(x, -int(s))
def inv_p(y, s):
"""Apply P_s inverse: P_{-s}."""
return np.roll(y, int(s))
# Parity blocks: p[c] is a Z-length binary vector for base column c (1..7)
p = [np.zeros(Z, dtype=np.int8) for _ in range(N_BASE)]
# Step 1: Solve row 0 for p[1]
# P_{H[0][0]} * info + P_{H[0][1]} * p1 = 0
# p1 = P_{H[0][1]}^-1 * P_{H[0][0]} * info
accum = apply_p(info_bits, H_BASE[0, 0])
p[1] = inv_p(accum, H_BASE[0, 1])
# Step 2: Solve rows 1-6 sequentially for p[2]..p[7]
for r in range(1, M_BASE):
# P_{H[r][0]}*info + P_{H[r][r]}*p[r] + P_{H[r][r+1]}*p[r+1] = 0
accum = apply_p(info_bits, H_BASE[r, 0])
accum = (accum + apply_p(p[r], H_BASE[r, r])) % 2
p[r + 1] = inv_p(accum, H_BASE[r, r + 1])
# Assemble codeword: [info | p1 | p2 | ... | p7]
codeword = np.concatenate([info_bits] + [p[c] for c in range(1, N_BASE)])
# Verify
check = H @ codeword % 2
assert np.all(check == 0), f"Encoding failed: syndrome weight = {check.sum()}"
return codeword
def poisson_channel(codeword, lam_s, lam_b):
"""
Simulate photon-counting optical channel.
For each bit:
bit=1: transmit pulse -> expected photons = lam_s + lam_b
bit=0: no pulse -> expected photons = lam_b (background only)
Receiver counts photons (Poisson distributed).
Output: LLR = log(P(y|1) / P(y|0)) for each received symbol.
For binary (click/no-click) detector:
P(click|1) = 1 - exp(-(lam_s + lam_b))
P(click|0) = 1 - exp(-lam_b)
"""
n = len(codeword)
# Expected photon counts
lam = np.where(codeword == 1, lam_s + lam_b, lam_b)
# Poisson photon counts
photon_counts = np.random.poisson(lam)
# Compute exact LLR for each observation
# P(y|1) = (lam_s+lam_b)^y * exp(-(lam_s+lam_b)) / y!
# P(y|0) = lam_b^y * exp(-lam_b) / y!
#
# Convention: LLR = log(P(y|0) / P(y|1)) (positive = bit 0 more likely)
# This matches decoder: positive belief -> hard decision 0, negative -> 1
#
# LLR = lam_s - y * log((lam_s + lam_b) / lam_b)
llr = np.zeros(n, dtype=np.float64)
for i in range(n):
y = photon_counts[i]
if lam_b > 0:
llr[i] = lam_s - y * np.log((lam_s + lam_b) / lam_b)
else:
# No background: click = definitely bit 1, no click = definitely bit 0
if y > 0:
llr[i] = -100.0 # strong negative (bit=1 likely)
else:
llr[i] = lam_s # no photons, likely bit=0
return llr, photon_counts
def quantize_llr(llr_float, q_bits=Q_BITS):
"""Quantize floating-point LLR to signed integer."""
q_max = 2**(q_bits-1) - 1
q_min = -(2**(q_bits-1))
# Scale: map typical LLR range to integer range
# For photon channel, LLRs are typically in [-5, +5] range
scale = q_max / 5.0
llr_scaled = np.round(llr_float * scale).astype(np.int32)
return np.clip(llr_scaled, q_min, q_max).astype(np.int8)
def sat_add_q(a, b):
"""Saturating add in Q-bit signed arithmetic."""
s = int(a) + int(b)
return max(Q_MIN, min(Q_MAX, s))
def sat_sub_q(a, b):
"""Saturating subtract in Q-bit signed arithmetic."""
return sat_add_q(a, -b)
def min_sum_cn_update(msgs_in, offset=OFFSET):
"""
Offset min-sum check node update.
For each output j:
sign = XOR of all other input signs
magnitude = min of all other magnitudes - offset (clamp to 0)
Args:
msgs_in: list of DC signed integers (Q-bit)
offset: offset correction value
Returns:
msgs_out: list of DC signed integers (Q-bit)
"""
dc = len(msgs_in)
signs = [1 if m < 0 else 0 for m in msgs_in]
mags = [abs(m) for m in msgs_in]
sign_xor = sum(signs) % 2
# Find min1, min2, and index of min1
min1 = Q_MAX
min2 = Q_MAX
min1_idx = 0
for i in range(dc):
if mags[i] < min1:
min2 = min1
min1 = mags[i]
min1_idx = i
elif mags[i] < min2:
min2 = mags[i]
msgs_out = []
for j in range(dc):
mag = min2 if j == min1_idx else min1
mag = max(0, mag - offset) # offset correction
sgn = sign_xor ^ signs[j] # extrinsic sign
val = -mag if sgn else mag
msgs_out.append(val)
return msgs_out
def decode_layered_min_sum(llr_q, max_iter=30, early_term=True):
"""
Layered offset min-sum LDPC decoder (bit-exact reference for RTL).
Args:
llr_q: quantized channel LLRs (N-length array of signed Q-bit integers)
max_iter: maximum iterations
early_term: stop when syndrome is zero
Returns:
decoded_bits: hard decisions (N-length binary array)
converged: True if syndrome == 0
iterations: number of iterations performed
syndrome_weight: final syndrome weight
"""
# Initialize beliefs from channel LLRs
beliefs = [int(x) for x in llr_q]
# Initialize CN->VN messages to zero
# msg[row][col][z] = message from CN (row*Z+z) to VN at shifted position
msg = [[[0 for _ in range(Z)] for _ in range(N_BASE)] for _ in range(M_BASE)]
for iteration in range(max_iter):
# Process each base matrix row (layer)
for row in range(M_BASE):
# Find which columns are connected in this row
connected_cols = [c for c in range(N_BASE) if H_BASE[row, c] >= 0]
dc = len(connected_cols)
# Step 1: Compute VN->CN messages by subtracting old CN->VN
# vn_to_cn[col_pos][z] where col_pos indexes into connected_cols
vn_to_cn = [[0]*Z for _ in range(dc)]
for ci, col in enumerate(connected_cols):
shift = int(H_BASE[row, col])
for z in range(Z):
shifted_z = (z + shift) % Z
bit_idx = col * Z + shifted_z
old_msg = msg[row][col][z]
vn_to_cn[ci][z] = sat_sub_q(beliefs[bit_idx], old_msg)
# Step 2: CN min-sum update (only over connected columns)
cn_to_vn = [[0]*Z for _ in range(dc)]
for z in range(Z):
cn_inputs = [vn_to_cn[ci][z] for ci in range(dc)]
cn_outputs = min_sum_cn_update(cn_inputs)
for ci in range(dc):
cn_to_vn[ci][z] = cn_outputs[ci]
# Step 3: Update beliefs and store new messages
for ci, col in enumerate(connected_cols):
shift = int(H_BASE[row, col])
for z in range(Z):
shifted_z = (z + shift) % Z
bit_idx = col * Z + shifted_z
new_msg = cn_to_vn[ci][z]
extrinsic = vn_to_cn[ci][z]
beliefs[bit_idx] = sat_add_q(extrinsic, new_msg)
msg[row][col][z] = new_msg
# Syndrome check
hard = [1 if b < 0 else 0 for b in beliefs]
syndrome_weight = compute_syndrome_weight(hard)
if early_term and syndrome_weight == 0:
return np.array(hard[:K]), True, iteration + 1, 0
hard = [1 if b < 0 else 0 for b in beliefs]
syndrome_weight = compute_syndrome_weight(hard)
return np.array(hard[:K]), syndrome_weight == 0, max_iter, syndrome_weight
def compute_syndrome_weight(hard_bits):
"""Compute syndrome weight = number of unsatisfied parity checks."""
weight = 0
for r in range(M_BASE):
for z in range(Z):
parity = 0
for c in range(N_BASE):
shift = int(H_BASE[r, c])
if shift < 0:
continue
shifted_z = (z + shift) % Z
bit_idx = c * Z + shifted_z
parity ^= hard_bits[bit_idx]
if parity:
weight += 1
return weight
def run_ber_simulation(lam_s_db_range, lam_b=0.1, n_frames=1000, max_iter=30):
"""
Run BER simulation over a range of signal photon counts.
Args:
lam_s_db_range: signal photons/slot in dB (10*log10(lam_s))
lam_b: background photon rate
n_frames: number of codewords per SNR point
max_iter: decoder iterations
"""
H = build_full_h_matrix()
print(f"H matrix: {H.shape}, rank = {np.linalg.matrix_rank(H.astype(float))}")
print(f"Code: ({N},{K}) rate {K/N:.3f}, Z={Z}")
print(f"Background photons: {lam_b}")
print(f"{'lam_s_dB':>10s} {'lam_s':>8s} {'BER':>10s} {'FER':>10s} {'avg_iter':>10s}")
print("-" * 55)
results = []
for lam_s_db in lam_s_db_range:
lam_s = 10**(lam_s_db / 10)
bit_errors = 0
frame_errors = 0
total_bits = 0
total_iter = 0
for frame in range(n_frames):
# Random info bits
info = np.random.randint(0, 2, K)
# Encode
codeword = ldpc_encode(info, H)
# Channel
llr_float, _ = poisson_channel(codeword, lam_s, lam_b)
llr_q = quantize_llr(llr_float)
# Decode
decoded, converged, iters, _ = decode_layered_min_sum(llr_q, max_iter)
total_iter += iters
# Count errors
errs = np.sum(decoded != info)
bit_errors += errs
total_bits += K
if errs > 0:
frame_errors += 1
ber = bit_errors / total_bits if total_bits > 0 else 0
fer = frame_errors / n_frames
avg_iter = total_iter / n_frames
print(f"{lam_s_db:10.1f} {lam_s:8.3f} {ber:10.6f} {fer:10.4f} {avg_iter:10.1f}")
results.append({
'lam_s_db': lam_s_db, 'lam_s': lam_s,
'ber': ber, 'fer': fer, 'avg_iter': avg_iter
})
return results
def generate_test_vectors(n_vectors=10, lam_s=2.0, lam_b=0.1, max_iter=30):
"""Generate test vectors for RTL verification."""
H = build_full_h_matrix()
vectors = []
for i in range(n_vectors):
info = np.random.randint(0, 2, K)
codeword = ldpc_encode(info, H)
llr_float, photons = poisson_channel(codeword, lam_s, lam_b)
llr_q = quantize_llr(llr_float)
decoded, converged, iters, syn_wt = decode_layered_min_sum(llr_q, max_iter)
vec = {
'index': i,
'info_bits': info.tolist(),
'codeword': codeword.tolist(),
'photon_counts': photons.tolist(),
'llr_float': llr_float.tolist(),
'llr_quantized': llr_q.tolist(),
'decoded_bits': decoded.tolist(),
'converged': bool(converged),
'iterations': iters,
'syndrome_weight': syn_wt,
'bit_errors': int(np.sum(decoded != info)),
}
vectors.append(vec)
status = "PASS" if np.array_equal(decoded, info) else f"FAIL ({vec['bit_errors']} errs)"
print(f" Vector {i}: {status} (iter={iters}, converged={converged})")
return vectors
def main():
parser = argparse.ArgumentParser(description='LDPC Decoder Behavioral Model')
parser.add_argument('--gen-vectors', action='store_true',
help='Generate RTL test vectors')
parser.add_argument('--sweep-snr', action='store_true',
help='Run BER vs SNR sweep')
parser.add_argument('--n-frames', type=int, default=1000,
help='Frames per SNR point (default: 1000)')
parser.add_argument('--max-iter', type=int, default=30,
help='Max decoder iterations (default: 30)')
parser.add_argument('--lam-s', type=float, default=2.0,
help='Signal photons/slot for test vectors (default: 2.0)')
parser.add_argument('--lam-b', type=float, default=0.1,
help='Background photons/slot (default: 0.1)')
parser.add_argument('--seed', type=int, default=42,
help='Random seed (default: 42)')
args = parser.parse_args()
np.random.seed(args.seed)
if args.gen_vectors:
print(f"Generating test vectors (lam_s={args.lam_s}, lam_b={args.lam_b})...")
vectors = generate_test_vectors(
n_vectors=20, lam_s=args.lam_s, lam_b=args.lam_b,
max_iter=args.max_iter
)
out_path = os.path.join(os.path.dirname(__file__), '..', 'data', 'test_vectors.json')
with open(out_path, 'w') as f:
json.dump(vectors, f, indent=2)
print(f"\nWrote {len(vectors)} vectors to {out_path}")
elif args.sweep_snr:
print("BER Sweep: Poisson photon-counting channel, rate-1/8 QC-LDPC")
lam_s_db_range = np.arange(-6, 10, 1.0) # -6 to +9 dB
results = run_ber_simulation(
lam_s_db_range, lam_b=args.lam_b,
n_frames=args.n_frames, max_iter=args.max_iter
)
out_path = os.path.join(os.path.dirname(__file__), '..', 'data', 'ber_results.json')
with open(out_path, 'w') as f:
json.dump(results, f, indent=2)
print(f"\nWrote results to {out_path}")
else:
# Quick demo
print("=== LDPC Rate-1/8 Decoder Demo ===")
print(f"Code: ({N},{K}), rate {K/N:.3f}, Z={Z}")
H = build_full_h_matrix()
print(f"H matrix: {H.shape}, density: {H.sum()/(H.shape[0]*H.shape[1]):.4f}")
info = np.random.randint(0, 2, K)
print(f"\nInfo bits ({K}): {info}")
codeword = ldpc_encode(info, H)
print(f"Codeword ({N} bits), weight: {codeword.sum()}")
# Simulate at a few photon levels
for lam_s in [0.5, 1.0, 2.0, 5.0]:
np.random.seed(args.seed)
llr_float, photons = poisson_channel(codeword, lam_s, args.lam_b)
llr_q = quantize_llr(llr_float)
decoded, converged, iters, syn_wt = decode_layered_min_sum(llr_q)
errors = np.sum(decoded != info)
print(f" lam_s={lam_s:.1f}: decoded in {iters} iter, "
f"converged={converged}, errors={errors}")
if __name__ == '__main__':
main()