#!/usr/bin/env python3 """ LDPC Decoder Behavioral Model - Bit-Exact Reference for RTL Verification Implements offset min-sum decoding with layered scheduling for a rate-1/8 QC-LDPC code (n=256, k=32, Z=32, base matrix 7x8). Channel model: Poisson photon-counting (optical communication) Usage: python3 ldpc_sim.py # Run BER simulation python3 ldpc_sim.py --gen-vectors # Generate RTL test vectors python3 ldpc_sim.py --sweep-snr # SNR sweep for BER curve """ import numpy as np import argparse import json import os # ============================================================================= # Code parameters # ============================================================================= N_BASE = 8 # base matrix columns M_BASE = 7 # base matrix rows Z = 32 # lifting factor N = N_BASE * Z # 256 codeword bits K = Z # 32 info bits (rate 1/8) M = M_BASE * Z # 224 parity checks Q_BITS = 6 # quantization bits (signed) Q_MAX = 2**(Q_BITS-1) - 1 # +31 Q_MIN = -(2**(Q_BITS-1)) # -32 OFFSET = 1 # min-sum offset (integer) # Base matrix: H_BASE[row][col] = cyclic shift, -1 = no connection # This must match the RTL exactly! H_BASE = np.array([ [ 0, 5, 11, 17, 23, 29, 3, 9], [15, 0, 21, 7, 13, 19, 25, 31], [10, 20, 0, 30, 8, 16, 24, 2], [27, 14, 1, 0, 18, 6, 12, 22], [ 4, 28, 16, 12, 0, 26, 8, 20], [19, 9, 31, 25, 15, 0, 21, 11], [22, 26, 6, 14, 30, 10, 0, 18], ], dtype=np.int8) def build_full_h_matrix(): """Expand QC base matrix to full binary parity-check matrix H (M x N).""" H = np.zeros((M, N), dtype=np.int8) for r in range(M_BASE): for c in range(N_BASE): shift = H_BASE[r, c] if shift < 0: continue # null sub-matrix # Cyclic permutation matrix of size Z with shift for z in range(Z): H[r * Z + z, c * Z + (z + shift) % Z] = 1 return H def ldpc_encode(info_bits, H): """ Systematic encoding: info bits are the first K bits of codeword. Solve H * c^T = 0 for parity bits given info bits. For a systematic code, H = [H_p | H_i] where H_p is invertible. c = [info | parity], H_p * parity^T = H_i * info^T (mod 2) This uses dense GF(2) Gaussian elimination. Fine for small codes. """ # info_bits goes in columns 0..K-1 (first base column = info) # Parity bits in columns K..N-1 # We need to solve: H[:,K:] * p = H[:,:K] * info (mod 2) H_p = H[:, K:].copy() # M x (N-K) = 224 x 224 H_i = H[:, :K].copy() # M x K = 224 x 32 syndrome = H_i @ info_bits % 2 # M-vector # Gaussian elimination on H_p to solve for parity n_parity = N - K # 224 assert H_p.shape == (M, n_parity) # Augmented matrix [H_p | syndrome] aug = np.hstack([H_p, syndrome.reshape(-1, 1)]).astype(np.int8) # Forward elimination pivot_row = 0 for col in range(n_parity): # Find pivot found = False for row in range(pivot_row, M): if aug[row, col] == 1: aug[[pivot_row, row]] = aug[[row, pivot_row]] found = True break if not found: continue # skip this column (rank deficient) # Eliminate for row in range(M): if row != pivot_row and aug[row, col] == 1: aug[row] = (aug[row] + aug[pivot_row]) % 2 pivot_row += 1 parity = aug[:n_parity, -1] # solution codeword = np.concatenate([info_bits, parity]) # Verify check = H @ codeword % 2 assert np.all(check == 0), f"Encoding failed: syndrome weight = {check.sum()}" return codeword def poisson_channel(codeword, lam_s, lam_b): """ Simulate photon-counting optical channel. For each bit: bit=1: transmit pulse -> expected photons = lam_s + lam_b bit=0: no pulse -> expected photons = lam_b (background only) Receiver counts photons (Poisson distributed). Output: LLR = log(P(y|1) / P(y|0)) for each received symbol. For binary (click/no-click) detector: P(click|1) = 1 - exp(-(lam_s + lam_b)) P(click|0) = 1 - exp(-lam_b) """ n = len(codeword) # Expected photon counts lam = np.where(codeword == 1, lam_s + lam_b, lam_b) # Poisson photon counts photon_counts = np.random.poisson(lam) # Compute exact LLR for each observation # P(y|1) = (lam_s+lam_b)^y * exp(-(lam_s+lam_b)) / y! # P(y|0) = lam_b^y * exp(-lam_b) / y! # LLR = y * log((lam_s+lam_b)/lam_b) - lam_s llr = np.zeros(n, dtype=np.float64) for i in range(n): y = photon_counts[i] if lam_b > 0: llr[i] = y * np.log((lam_s + lam_b) / lam_b) - lam_s else: # No background: click = definitely bit 1, no click = definitely bit 0 if y > 0: llr[i] = 100.0 # strong positive (bit=1) else: llr[i] = -lam_s # no photons, likely bit=0 return llr, photon_counts def quantize_llr(llr_float, q_bits=Q_BITS): """Quantize floating-point LLR to signed integer.""" q_max = 2**(q_bits-1) - 1 q_min = -(2**(q_bits-1)) # Scale: map typical LLR range to integer range # For photon channel, LLRs are typically in [-5, +5] range scale = q_max / 5.0 llr_scaled = np.round(llr_float * scale).astype(np.int32) return np.clip(llr_scaled, q_min, q_max).astype(np.int8) def sat_add_q(a, b): """Saturating add in Q-bit signed arithmetic.""" s = int(a) + int(b) return max(Q_MIN, min(Q_MAX, s)) def sat_sub_q(a, b): """Saturating subtract in Q-bit signed arithmetic.""" return sat_add_q(a, -b) def min_sum_cn_update(msgs_in, offset=OFFSET): """ Offset min-sum check node update. For each output j: sign = XOR of all other input signs magnitude = min of all other magnitudes - offset (clamp to 0) Args: msgs_in: list of DC signed integers (Q-bit) offset: offset correction value Returns: msgs_out: list of DC signed integers (Q-bit) """ dc = len(msgs_in) signs = [1 if m < 0 else 0 for m in msgs_in] mags = [abs(m) for m in msgs_in] sign_xor = sum(signs) % 2 # Find min1, min2, and index of min1 min1 = Q_MAX min2 = Q_MAX min1_idx = 0 for i in range(dc): if mags[i] < min1: min2 = min1 min1 = mags[i] min1_idx = i elif mags[i] < min2: min2 = mags[i] msgs_out = [] for j in range(dc): mag = min2 if j == min1_idx else min1 mag = max(0, mag - offset) # offset correction sgn = sign_xor ^ signs[j] # extrinsic sign val = -mag if sgn else mag msgs_out.append(val) return msgs_out def decode_layered_min_sum(llr_q, max_iter=30, early_term=True): """ Layered offset min-sum LDPC decoder (bit-exact reference for RTL). Args: llr_q: quantized channel LLRs (N-length array of signed Q-bit integers) max_iter: maximum iterations early_term: stop when syndrome is zero Returns: decoded_bits: hard decisions (N-length binary array) converged: True if syndrome == 0 iterations: number of iterations performed syndrome_weight: final syndrome weight """ # Initialize beliefs from channel LLRs beliefs = [int(x) for x in llr_q] # Initialize CN->VN messages to zero # msg[row][col][z] = message from CN (row*Z+z) to VN at shifted position msg = [[[0 for _ in range(Z)] for _ in range(N_BASE)] for _ in range(M_BASE)] for iteration in range(max_iter): # Process each base matrix row (layer) for row in range(M_BASE): # Step 1: Compute VN->CN messages by subtracting old CN->VN vn_to_cn = [[0]*Z for _ in range(N_BASE)] for col in range(N_BASE): shift = int(H_BASE[row, col]) if shift < 0: continue for z in range(Z): shifted_z = (z + shift) % Z bit_idx = col * Z + shifted_z old_msg = msg[row][col][z] vn_to_cn[col][z] = sat_sub_q(beliefs[bit_idx], old_msg) # Step 2: CN min-sum update cn_to_vn = [[0]*Z for _ in range(N_BASE)] for z in range(Z): # Gather messages from all columns for this check node cn_inputs = [vn_to_cn[col][z] for col in range(N_BASE)] cn_outputs = min_sum_cn_update(cn_inputs) for col in range(N_BASE): cn_to_vn[col][z] = cn_outputs[col] # Step 3: Update beliefs and store new messages for col in range(N_BASE): shift = int(H_BASE[row, col]) if shift < 0: continue for z in range(Z): shifted_z = (z + shift) % Z bit_idx = col * Z + shifted_z new_msg = cn_to_vn[col][z] extrinsic = vn_to_cn[col][z] beliefs[bit_idx] = sat_add_q(extrinsic, new_msg) msg[row][col][z] = new_msg # Syndrome check hard = [1 if b < 0 else 0 for b in beliefs] syndrome_weight = compute_syndrome_weight(hard) if early_term and syndrome_weight == 0: return np.array(hard[:K]), True, iteration + 1, 0 hard = [1 if b < 0 else 0 for b in beliefs] syndrome_weight = compute_syndrome_weight(hard) return np.array(hard[:K]), syndrome_weight == 0, max_iter, syndrome_weight def compute_syndrome_weight(hard_bits): """Compute syndrome weight = number of unsatisfied parity checks.""" weight = 0 for r in range(M_BASE): for z in range(Z): parity = 0 for c in range(N_BASE): shift = int(H_BASE[r, c]) if shift < 0: continue shifted_z = (z + shift) % Z bit_idx = c * Z + shifted_z parity ^= hard_bits[bit_idx] if parity: weight += 1 return weight def run_ber_simulation(lam_s_db_range, lam_b=0.1, n_frames=1000, max_iter=30): """ Run BER simulation over a range of signal photon counts. Args: lam_s_db_range: signal photons/slot in dB (10*log10(lam_s)) lam_b: background photon rate n_frames: number of codewords per SNR point max_iter: decoder iterations """ H = build_full_h_matrix() print(f"H matrix: {H.shape}, rank = {np.linalg.matrix_rank(H.astype(float))}") print(f"Code: ({N},{K}) rate {K/N:.3f}, Z={Z}") print(f"Background photons: {lam_b}") print(f"{'lam_s_dB':>10s} {'lam_s':>8s} {'BER':>10s} {'FER':>10s} {'avg_iter':>10s}") print("-" * 55) results = [] for lam_s_db in lam_s_db_range: lam_s = 10**(lam_s_db / 10) bit_errors = 0 frame_errors = 0 total_bits = 0 total_iter = 0 for frame in range(n_frames): # Random info bits info = np.random.randint(0, 2, K) # Encode codeword = ldpc_encode(info, H) # Channel llr_float, _ = poisson_channel(codeword, lam_s, lam_b) llr_q = quantize_llr(llr_float) # Decode decoded, converged, iters, _ = decode_layered_min_sum(llr_q, max_iter) total_iter += iters # Count errors errs = np.sum(decoded != info) bit_errors += errs total_bits += K if errs > 0: frame_errors += 1 ber = bit_errors / total_bits if total_bits > 0 else 0 fer = frame_errors / n_frames avg_iter = total_iter / n_frames print(f"{lam_s_db:10.1f} {lam_s:8.3f} {ber:10.6f} {fer:10.4f} {avg_iter:10.1f}") results.append({ 'lam_s_db': lam_s_db, 'lam_s': lam_s, 'ber': ber, 'fer': fer, 'avg_iter': avg_iter }) return results def generate_test_vectors(n_vectors=10, lam_s=2.0, lam_b=0.1, max_iter=30): """Generate test vectors for RTL verification.""" H = build_full_h_matrix() vectors = [] for i in range(n_vectors): info = np.random.randint(0, 2, K) codeword = ldpc_encode(info, H) llr_float, photons = poisson_channel(codeword, lam_s, lam_b) llr_q = quantize_llr(llr_float) decoded, converged, iters, syn_wt = decode_layered_min_sum(llr_q, max_iter) vec = { 'index': i, 'info_bits': info.tolist(), 'codeword': codeword.tolist(), 'photon_counts': photons.tolist(), 'llr_float': llr_float.tolist(), 'llr_quantized': llr_q.tolist(), 'decoded_bits': decoded.tolist(), 'converged': bool(converged), 'iterations': iters, 'syndrome_weight': syn_wt, 'bit_errors': int(np.sum(decoded != info)), } vectors.append(vec) status = "PASS" if np.array_equal(decoded, info) else f"FAIL ({vec['bit_errors']} errs)" print(f" Vector {i}: {status} (iter={iters}, converged={converged})") return vectors def main(): parser = argparse.ArgumentParser(description='LDPC Decoder Behavioral Model') parser.add_argument('--gen-vectors', action='store_true', help='Generate RTL test vectors') parser.add_argument('--sweep-snr', action='store_true', help='Run BER vs SNR sweep') parser.add_argument('--n-frames', type=int, default=1000, help='Frames per SNR point (default: 1000)') parser.add_argument('--max-iter', type=int, default=30, help='Max decoder iterations (default: 30)') parser.add_argument('--lam-s', type=float, default=2.0, help='Signal photons/slot for test vectors (default: 2.0)') parser.add_argument('--lam-b', type=float, default=0.1, help='Background photons/slot (default: 0.1)') parser.add_argument('--seed', type=int, default=42, help='Random seed (default: 42)') args = parser.parse_args() np.random.seed(args.seed) if args.gen_vectors: print(f"Generating test vectors (lam_s={args.lam_s}, lam_b={args.lam_b})...") vectors = generate_test_vectors( n_vectors=20, lam_s=args.lam_s, lam_b=args.lam_b, max_iter=args.max_iter ) out_path = os.path.join(os.path.dirname(__file__), '..', 'data', 'test_vectors.json') with open(out_path, 'w') as f: json.dump(vectors, f, indent=2) print(f"\nWrote {len(vectors)} vectors to {out_path}") elif args.sweep_snr: print("BER Sweep: Poisson photon-counting channel, rate-1/8 QC-LDPC") lam_s_db_range = np.arange(-6, 10, 1.0) # -6 to +9 dB results = run_ber_simulation( lam_s_db_range, lam_b=args.lam_b, n_frames=args.n_frames, max_iter=args.max_iter ) out_path = os.path.join(os.path.dirname(__file__), '..', 'data', 'ber_results.json') with open(out_path, 'w') as f: json.dump(results, f, indent=2) print(f"\nWrote results to {out_path}") else: # Quick demo print("=== LDPC Rate-1/8 Decoder Demo ===") print(f"Code: ({N},{K}), rate {K/N:.3f}, Z={Z}") H = build_full_h_matrix() print(f"H matrix: {H.shape}, density: {H.sum()/(H.shape[0]*H.shape[1]):.4f}") info = np.random.randint(0, 2, K) print(f"\nInfo bits ({K}): {info}") codeword = ldpc_encode(info, H) print(f"Codeword ({N} bits), weight: {codeword.sum()}") # Simulate at a few photon levels for lam_s in [0.5, 1.0, 2.0, 5.0]: np.random.seed(args.seed) llr_float, photons = poisson_channel(codeword, lam_s, args.lam_b) llr_q = quantize_llr(llr_float) decoded, converged, iters, syn_wt = decode_layered_min_sum(llr_q) errors = np.sum(decoded != info) print(f" lam_s={lam_s:.1f}: decoded in {iters} iter, " f"converged={converged}, errors={errors}") if __name__ == '__main__': main()