From b8bff512a4f7b827fd3297f2d03eaf21e3b8ed9e Mon Sep 17 00:00:00 2001 From: cah Date: Tue, 24 Feb 2026 04:58:56 -0700 Subject: [PATCH] Add implementation plan for frame sync and code analysis Co-Authored-By: Claude Opus 4.6 --- ...02-23-frame-sync-and-code-analysis-impl.md | 1680 +++++++++++++++++ 1 file changed, 1680 insertions(+) create mode 100644 docs/plans/2026-02-23-frame-sync-and-code-analysis-impl.md diff --git a/docs/plans/2026-02-23-frame-sync-and-code-analysis-impl.md b/docs/plans/2026-02-23-frame-sync-and-code-analysis-impl.md new file mode 100644 index 0000000..19afee5 --- /dev/null +++ b/docs/plans/2026-02-23-frame-sync-and-code-analysis-impl.md @@ -0,0 +1,1680 @@ +# Frame Sync & Code Analysis Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Build a frame synchronization prototype and four quantitative code analyses to validate design decisions for the LDPC optical decoder (target: 1-2 photons/slot). + +**Architecture:** Two new scripts (`frame_sync.py`, `ldpc_analysis.py`) import encoder/decoder/channel from existing `ldpc_sim.py`. Frame sync simulates a continuous stream with unknown offset. Analyses compare rates, matrices, quantization, and Shannon limits. All output summary tables to stdout and save results to `data/`. + +**Tech Stack:** Python 3, numpy, scipy (for Poisson PMF and optimization in Shannon gap). pytest for validation tests. + +**Key reference files:** +- `model/ldpc_sim.py` — encoder, decoder, channel, syndrome check (DO NOT MODIFY) +- `docs/plans/2026-02-23-frame-sync-and-code-analysis-design.md` — design document + +--- + +### Task 1: Test Harness — Validate Existing Model + +**Files:** +- Create: `model/test_ldpc.py` + +**Step 1: Write validation tests for existing ldpc_sim.py** + +These tests establish a known-good baseline before we build on top. + +```python +"""Tests for ldpc_sim.py — validates encoder, decoder, channel, syndrome.""" +import numpy as np +import pytest +from ldpc_sim import ( + build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr, + decode_layered_min_sum, compute_syndrome_weight, cyclic_shift, + sat_add_q, sat_sub_q, min_sum_cn_update, + N, K, M, Z, N_BASE, M_BASE, H_BASE, Q_MAX, Q_MIN, +) + + +class TestCodeParameters: + def test_dimensions(self): + assert N == 256 + assert K == 32 + assert M == 224 + assert Z == 32 + assert N_BASE == 8 + assert M_BASE == 7 + + def test_h_base_shape(self): + assert H_BASE.shape == (M_BASE, N_BASE) + + def test_h_base_info_column_fully_connected(self): + """Column 0 (info) should connect to all 7 rows.""" + for r in range(M_BASE): + assert H_BASE[r, 0] >= 0 + + def test_h_base_staircase_structure(self): + """Parity columns should form lower-triangular staircase.""" + for r in range(M_BASE): + # Diagonal parity column (r+1) should be connected + assert H_BASE[r, r + 1] >= 0 + # Above diagonal should be -1 (not connected) + for c in range(r + 2, N_BASE): + assert H_BASE[r, c] == -1 + + +class TestHMatrix: + def test_full_h_dimensions(self): + H = build_full_h_matrix() + assert H.shape == (M, N) + + def test_full_h_binary(self): + H = build_full_h_matrix() + assert set(np.unique(H)).issubset({0, 1}) + + def test_full_h_row_weights(self): + """Each row in a Z-block should have the same weight (QC property).""" + H = build_full_h_matrix() + for r in range(M_BASE): + weights = [H[r * Z + z].sum() for z in range(Z)] + assert len(set(weights)) == 1, f"Row block {r} has inconsistent weights" + + def test_full_h_rank(self): + """H should have rank M (full rank) for the code to work.""" + H = build_full_h_matrix() + rank = np.linalg.matrix_rank(H.astype(float)) + assert rank == M + + +class TestEncoder: + def test_encode_all_zeros(self): + H = build_full_h_matrix() + info = np.zeros(K, dtype=np.int8) + codeword = ldpc_encode(info, H) + assert len(codeword) == N + assert np.all(codeword == 0) # all-zero is always a codeword + + def test_encode_satisfies_parity(self): + H = build_full_h_matrix() + np.random.seed(123) + info = np.random.randint(0, 2, K).astype(np.int8) + codeword = ldpc_encode(info, H) + syndrome = H @ codeword % 2 + assert np.all(syndrome == 0) + + def test_encode_preserves_info_bits(self): + H = build_full_h_matrix() + np.random.seed(456) + info = np.random.randint(0, 2, K).astype(np.int8) + codeword = ldpc_encode(info, H) + assert np.array_equal(codeword[:K], info) + + def test_encode_multiple_random(self): + """Encode 20 random messages, all must satisfy parity.""" + H = build_full_h_matrix() + for seed in range(20): + np.random.seed(seed) + info = np.random.randint(0, 2, K).astype(np.int8) + codeword = ldpc_encode(info, H) + syndrome = H @ codeword % 2 + assert np.all(syndrome == 0), f"Failed for seed {seed}" + + +class TestSaturatingArithmetic: + def test_sat_add_normal(self): + assert sat_add_q(10, 5) == 15 + + def test_sat_add_positive_overflow(self): + assert sat_add_q(Q_MAX, 1) == Q_MAX + + def test_sat_add_negative_overflow(self): + assert sat_add_q(Q_MIN, -1) == Q_MIN + + def test_sat_sub(self): + assert sat_sub_q(10, 3) == 7 + assert sat_sub_q(Q_MIN, 1) == Q_MIN + + +class TestMinSumCN: + def test_all_positive(self): + """All positive inputs: output signs should all be positive.""" + msgs = [5, 10, 15] + out = min_sum_cn_update(msgs, offset=1) + assert all(o >= 0 for o in out) + + def test_one_negative(self): + """One negative: that output positive, others negative.""" + msgs = [-5, 10, 15] + out = min_sum_cn_update(msgs, offset=1) + assert out[0] >= 0 # extrinsic sign excludes own sign + assert out[1] <= 0 + assert out[2] <= 0 + + def test_magnitude_is_min_of_others(self): + """Output magnitude = min of OTHER magnitudes - offset.""" + msgs = [3, 10, 20] + out = min_sum_cn_update(msgs, offset=1) + # out[0]: min of {10,20} - 1 = 9 + assert abs(out[0]) == 9 + # out[1]: min of {3,20} - 1 = 2 + assert abs(out[1]) == 2 + # out[2]: min of {3,10} - 1 = 2 + assert abs(out[2]) == 2 + + +class TestDecoder: + def test_decode_noiseless(self): + """Noiseless channel (huge lambda_s) should decode perfectly.""" + H = build_full_h_matrix() + np.random.seed(789) + info = np.random.randint(0, 2, K).astype(np.int8) + codeword = ldpc_encode(info, H) + # Noiseless: LLR is just +/- large value + llr = np.where(codeword == 0, 20.0, -20.0) + llr_q = quantize_llr(llr) + decoded, converged, iters, syn_wt = decode_layered_min_sum(llr_q, max_iter=5) + assert converged + assert np.array_equal(decoded, info) + assert iters == 1 # should converge in 1 iteration with perfect input + + def test_decode_high_snr(self): + """High SNR (lambda_s=10) should decode reliably.""" + H = build_full_h_matrix() + np.random.seed(101) + info = np.random.randint(0, 2, K).astype(np.int8) + codeword = ldpc_encode(info, H) + llr, _ = poisson_channel(codeword, lam_s=10.0, lam_b=0.1) + llr_q = quantize_llr(llr) + decoded, converged, _, _ = decode_layered_min_sum(llr_q) + assert converged + assert np.array_equal(decoded, info) + + +class TestSyndrome: + def test_valid_codeword_syndrome_zero(self): + H = build_full_h_matrix() + np.random.seed(202) + info = np.random.randint(0, 2, K).astype(np.int8) + codeword = ldpc_encode(info, H) + assert compute_syndrome_weight(codeword.tolist()) == 0 + + def test_flipped_bit_nonzero_syndrome(self): + H = build_full_h_matrix() + info = np.zeros(K, dtype=np.int8) + codeword = ldpc_encode(info, H) + corrupted = codeword.copy() + corrupted[0] ^= 1 # flip one bit + assert compute_syndrome_weight(corrupted.tolist()) > 0 +``` + +**Step 2: Run tests to verify they pass** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc.py -v` +Expected: All tests PASS (these test existing working code) + +**Step 3: Commit** + +```bash +git add model/test_ldpc.py +git commit -m "test: add validation tests for existing LDPC model" +``` + +--- + +### Task 2: Frame Sync — Streaming Infrastructure + +**Files:** +- Create: `model/frame_sync.py` +- Create: `model/test_frame_sync.py` + +**Step 1: Write the test for stream generation and syndrome screening** + +```python +"""Tests for frame synchronization.""" +import numpy as np +import pytest +from ldpc_sim import ( + build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr, + compute_syndrome_weight, N, K, Z, +) +from frame_sync import generate_stream, syndrome_screen + + +class TestStreamGeneration: + def test_stream_length(self): + """Stream should be n_frames * N + offset samples long.""" + H = build_full_h_matrix() + np.random.seed(42) + stream_llr, true_offset, info_list = generate_stream( + H, n_frames=5, lam_s=5.0, lam_b=0.1, offset=10 + ) + assert len(stream_llr) == 5 * N + 10 + assert true_offset == 10 + assert len(info_list) == 5 + + def test_stream_zero_offset(self): + H = build_full_h_matrix() + np.random.seed(42) + stream_llr, true_offset, _ = generate_stream( + H, n_frames=3, lam_s=5.0, lam_b=0.1, offset=0 + ) + assert true_offset == 0 + assert len(stream_llr) == 3 * N + + def test_stream_random_offset(self): + H = build_full_h_matrix() + np.random.seed(42) + stream_llr, true_offset, _ = generate_stream( + H, n_frames=5, lam_s=5.0, lam_b=0.1, offset=None + ) + assert 0 <= true_offset < N + assert len(stream_llr) == 5 * N + true_offset + + +class TestSyndromeScreen: + def test_correct_offset_low_syndrome(self): + """At high SNR, correct offset should have very low syndrome weight.""" + H = build_full_h_matrix() + np.random.seed(42) + stream_llr, true_offset, _ = generate_stream( + H, n_frames=5, lam_s=10.0, lam_b=0.1, offset=37 + ) + scores = syndrome_screen(stream_llr, n_offsets=N) + # Correct offset should have the lowest (or near-lowest) syndrome + best_offset = min(scores, key=scores.get) + assert best_offset == 37 + + def test_wrong_offsets_high_syndrome(self): + """Wrong offsets should have syndrome weight near M/2 ~ 112.""" + H = build_full_h_matrix() + np.random.seed(42) + stream_llr, true_offset, _ = generate_stream( + H, n_frames=5, lam_s=10.0, lam_b=0.1, offset=0 + ) + scores = syndrome_screen(stream_llr, n_offsets=N) + wrong_scores = [v for k, v in scores.items() if k != 0] + avg_wrong = np.mean(wrong_scores) + assert avg_wrong > 80 # should be near 112 for random +``` + +**Step 2: Run tests to verify they fail** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_frame_sync.py -v` +Expected: FAIL — `frame_sync` module doesn't exist yet + +**Step 3: Implement generate_stream and syndrome_screen** + +```python +#!/usr/bin/env python3 +""" +Frame Synchronization Prototype for LDPC Optical Decoder. + +Simulates a continuous photon-counting stream with unknown frame alignment. +Tests acquisition (finding codeword boundaries) and re-sync after loss. + +Usage: + python3 frame_sync.py # Quick demo at a few lambda_s + python3 frame_sync.py --sweep # Acquisition success vs lambda_s + python3 frame_sync.py --resync-test # Re-sync after offset slip +""" +import numpy as np +import argparse +import json +import os +from ldpc_sim import ( + build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr, + decode_layered_min_sum, compute_syndrome_weight, + N, K, M, Z, N_BASE, M_BASE, H_BASE, +) + +SCREENING_THRESHOLD = 50 # syndrome weight below this triggers full decode + + +def generate_stream(H, n_frames=10, lam_s=2.0, lam_b=0.1, offset=None): + """ + Generate a continuous stream of channel LLRs with unknown frame offset. + + Args: + H: full parity check matrix + n_frames: number of codewords to concatenate + lam_s: signal photons/slot + lam_b: background photons/slot + offset: bit offset to insert (None = random 0..N-1) + + Returns: + stream_llr: continuous LLR stream (float, not quantized) + true_offset: the actual offset used + info_list: list of info bit arrays for each frame (ground truth) + """ + if offset is None: + offset = np.random.randint(0, N) + + # Generate random prefix LLRs for the offset (noise-only, no codeword) + # These represent receiving channel output before the first codeword starts + prefix_llr = np.random.normal(0, 1, offset) # just noise + + # Generate codewords and their channel outputs + info_list = [] + codeword_llrs = [] + for _ in range(n_frames): + info = np.random.randint(0, 2, K).astype(np.int8) + codeword = ldpc_encode(info, H) + llr, _ = poisson_channel(codeword, lam_s, lam_b) + info_list.append(info) + codeword_llrs.append(llr) + + stream_llr = np.concatenate([prefix_llr] + codeword_llrs) + return stream_llr, offset, info_list + + +def syndrome_screen(stream_llr, n_offsets=None): + """ + Screen all candidate offsets using hard-decision syndrome weight. + + Args: + stream_llr: continuous LLR stream + n_offsets: number of offsets to try (default N=256) + + Returns: + scores: dict mapping offset -> syndrome weight + """ + if n_offsets is None: + n_offsets = N + + scores = {} + for off in range(n_offsets): + if off + N > len(stream_llr): + break + window = stream_llr[off:off + N] + hard = [0 if v > 0 else 1 for v in window] + syn_wt = compute_syndrome_weight(hard) + scores[off] = syn_wt + return scores + + +def acquire_sync(stream_llr, max_confirm=2, max_iter=30, verbose=False): + """ + Acquire frame synchronization from a continuous stream. + + Algorithm: + 1. Syndrome-screen all 256 offsets + 2. For offsets below threshold, run full iterative decode + 3. Confirm by decoding next max_confirm consecutive frames + + Args: + stream_llr: continuous LLR stream + max_confirm: number of consecutive frames to confirm sync + max_iter: decoder max iterations + verbose: print screening details + + Returns: + dict with keys: + locked: bool + offset: detected offset (or -1) + offsets_screened: number of offsets tried + full_decodes: number of full iterative decodes run + screening_cost: equivalent decode cycles for syndrome screening + """ + scores = syndrome_screen(stream_llr) + # Sort by syndrome weight (best candidates first) + ranked = sorted(scores.items(), key=lambda x: x[1]) + + full_decodes = 0 + for off, syn_wt in ranked: + if syn_wt >= SCREENING_THRESHOLD: + break # all remaining offsets are worse + + # Full iterative decode at this offset + window = stream_llr[off:off + N] + llr_q = quantize_llr(window) + decoded, converged, iters, _ = decode_layered_min_sum(llr_q, max_iter) + full_decodes += 1 + + if not converged: + continue + + # Confirm with consecutive frames + confirmed = True + for f in range(1, max_confirm + 1): + start = off + f * N + if start + N > len(stream_llr): + confirmed = False + break + win_f = stream_llr[start:start + N] + llr_q_f = quantize_llr(win_f) + _, conv_f, _, _ = decode_layered_min_sum(llr_q_f, max_iter) + full_decodes += 1 + if not conv_f: + confirmed = False + break + + if confirmed: + # Cost model: syndrome screening ~ 1/30 of a full decode each + screening_cost = len(scores) / 30.0 + return { + 'locked': True, + 'offset': off, + 'offsets_screened': len(scores), + 'full_decodes': full_decodes, + 'screening_cost': screening_cost, + 'total_equiv_decodes': screening_cost + full_decodes, + } + + return { + 'locked': False, + 'offset': -1, + 'offsets_screened': len(scores), + 'full_decodes': full_decodes, + 'screening_cost': len(scores) / 30.0, + 'total_equiv_decodes': len(scores) / 30.0 + full_decodes, + } + + +def resync_test(stream_llr, true_offset, slip_amount, search_radius=16, + max_iter=30): + """ + Test re-synchronization after an offset slip. + + Simulates the receiver losing sync by `slip_amount` bits, + then searching nearby offsets to re-acquire. + + Returns: + dict with locked, detected offset, cost + """ + slipped_offset = (true_offset + slip_amount) % N + + # First try nearby offsets (fast re-sync) + for delta in range(-search_radius, search_radius + 1): + off = (slipped_offset + delta) % N + if off + N > len(stream_llr): + continue + window = stream_llr[off:off + N] + hard = [0 if v > 0 else 1 for v in window] + syn_wt = compute_syndrome_weight(hard) + if syn_wt < SCREENING_THRESHOLD: + llr_q = quantize_llr(window) + _, converged, _, _ = decode_layered_min_sum(llr_q, max_iter) + if converged: + return { + 'locked': True, + 'offset': off, + 'slip': slip_amount, + 'search_radius_used': search_radius, + 'needed_full_search': False, + } + + # Fall back to full search + result = acquire_sync(stream_llr) + result['slip'] = slip_amount + result['search_radius_used'] = search_radius + result['needed_full_search'] = True + return result + + +def run_acquisition_sweep(lam_s_values, lam_b=0.1, n_trials=50, n_frames=10): + """Sweep lambda_s and measure acquisition success rate.""" + H = build_full_h_matrix() + print(f"{'lam_s':>8s} {'success%':>10s} {'avg_decodes':>12s} " + f"{'avg_cost':>10s} {'false_lock':>10s}") + print("-" * 55) + + results = [] + for lam_s in lam_s_values: + successes = 0 + false_locks = 0 + total_decodes = 0 + total_cost = 0 + + for trial in range(n_trials): + np.random.seed(trial * 1000 + int(lam_s * 100)) + stream, true_off, _ = generate_stream( + H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None + ) + result = acquire_sync(stream) + + if result['locked']: + if result['offset'] == true_off: + successes += 1 + else: + false_locks += 1 + total_decodes += result['full_decodes'] + total_cost += result['total_equiv_decodes'] + + pct = 100 * successes / n_trials + avg_dec = total_decodes / n_trials + avg_cost = total_cost / n_trials + fl_pct = 100 * false_locks / n_trials + + print(f"{lam_s:8.1f} {pct:9.1f}% {avg_dec:12.1f} " + f"{avg_cost:10.1f} {fl_pct:9.1f}%") + results.append({ + 'lam_s': lam_s, 'success_pct': pct, + 'avg_full_decodes': avg_dec, 'avg_equiv_cost': avg_cost, + 'false_lock_pct': fl_pct, + }) + return results + + +def run_resync_sweep(lam_s=5.0, lam_b=0.1, n_trials=20, n_frames=10): + """Test re-sync at various slip amounts.""" + H = build_full_h_matrix() + print(f"\nRe-sync test at lam_s={lam_s}") + print(f"{'slip':>6s} {'success%':>10s} {'full_search%':>12s}") + print("-" * 32) + + for slip in [1, 2, 4, 8, 16, 32, 64, 128]: + successes = 0 + full_searches = 0 + for trial in range(n_trials): + np.random.seed(trial * 100 + slip) + stream, true_off, _ = generate_stream( + H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None + ) + result = resync_test(stream, true_off, slip) + if result['locked']: + successes += 1 + if result.get('needed_full_search'): + full_searches += 1 + + print(f"{slip:6d} {100*successes/n_trials:9.1f}% " + f"{100*full_searches/n_trials:11.1f}%") + + +def main(): + parser = argparse.ArgumentParser(description='LDPC Frame Sync Prototype') + parser.add_argument('--sweep', action='store_true', + help='Acquisition success rate vs lambda_s') + parser.add_argument('--resync-test', action='store_true', + help='Re-sync after offset slip') + parser.add_argument('--lam-s', type=float, default=5.0, + help='Signal photons/slot (default: 5.0)') + parser.add_argument('--lam-b', type=float, default=0.1, + help='Background photons/slot (default: 0.1)') + parser.add_argument('--n-trials', type=int, default=50, + help='Trials per data point (default: 50)') + parser.add_argument('--seed', type=int, default=42) + args = parser.parse_args() + + if args.sweep: + print("=== Frame Sync Acquisition Sweep ===") + lam_s_values = [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0] + results = run_acquisition_sweep(lam_s_values, args.lam_b, args.n_trials) + out_path = os.path.join(os.path.dirname(__file__), '..', 'data', + 'sync_acquisition_results.json') + os.makedirs(os.path.dirname(out_path), exist_ok=True) + with open(out_path, 'w') as f: + json.dump(results, f, indent=2) + print(f"\nResults saved to {out_path}") + + elif args.resync_test: + run_resync_sweep(args.lam_s, args.lam_b, args.n_trials) + + else: + # Quick demo + print("=== Frame Sync Demo ===") + H = build_full_h_matrix() + np.random.seed(args.seed) + stream, true_off, infos = generate_stream( + H, n_frames=10, lam_s=args.lam_s, lam_b=args.lam_b, offset=None + ) + print(f"Stream: {len(stream)} samples, true offset: {true_off}") + print(f"Signal: lam_s={args.lam_s}, lam_b={args.lam_b}") + + result = acquire_sync(stream, verbose=True) + if result['locked']: + match = "CORRECT" if result['offset'] == true_off else "WRONG" + print(f"Locked at offset {result['offset']} ({match})") + print(f" Screened {result['offsets_screened']} offsets") + print(f" Full decodes: {result['full_decodes']}") + print(f" Total cost: {result['total_equiv_decodes']:.1f} " + f"equiv decodes") + else: + print("FAILED to acquire sync") + + +if __name__ == '__main__': + main() +``` + +**Step 4: Run tests to verify they pass** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_frame_sync.py -v` +Expected: All PASS + +**Step 5: Run the demo to verify it works end-to-end** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --lam-s 5.0` +Expected: Should lock at the correct offset + +**Step 6: Commit** + +```bash +git add model/frame_sync.py model/test_frame_sync.py +git commit -m "feat: add frame synchronization prototype with tests" +``` + +--- + +### Task 3: Code Analysis — Rate Comparison + +**Files:** +- Create: `model/ldpc_analysis.py` +- Create: `model/test_ldpc_analysis.py` + +**Step 1: Write the test for rate sweep** + +```python +"""Tests for ldpc_analysis.py.""" +import numpy as np +import pytest +from ldpc_analysis import build_ira_staircase, rate_sweep_single + + +class TestIRAStaircase: + def test_rate_1_2(self): + """Rate 1/2: 1x2 base matrix, n=64, k=32.""" + H_base, H_full = build_ira_staircase(m_base=1, z=32) + assert H_base.shape == (1, 2) + assert H_full.shape == (32, 64) + assert np.linalg.matrix_rank(H_full.astype(float)) == 32 + + def test_rate_1_4(self): + """Rate 1/4: 3x4 base matrix, n=128, k=32.""" + H_base, H_full = build_ira_staircase(m_base=3, z=32) + assert H_base.shape == (3, 4) + assert H_full.shape == (96, 128) + assert np.linalg.matrix_rank(H_full.astype(float)) == 96 + + def test_rate_1_8_matches_existing(self): + """Rate 1/8 staircase should produce a valid code.""" + H_base, H_full = build_ira_staircase(m_base=7, z=32) + assert H_base.shape == (7, 8) + assert H_full.shape == (224, 256) + + def test_encode_decode_roundtrip(self): + """Any generated staircase code should encode/decode at high SNR.""" + for m_base in [1, 2, 3, 5]: + H_base, H_full = build_ira_staircase(m_base=m_base, z=32) + n_base = m_base + 1 + k = 32 + n = n_base * 32 + np.random.seed(42) + info = np.random.randint(0, 2, k).astype(np.int8) + # Use generic IRA encoder + from ldpc_analysis import ira_encode + codeword = ira_encode(info, H_base, H_full, z=32) + syndrome = H_full @ codeword % 2 + assert np.all(syndrome == 0), f"Failed for m_base={m_base}" + + +class TestRateSweep: + def test_high_snr_all_rates_decode(self): + """At very high SNR, all rates should achieve FER=0.""" + for m_base in [1, 3, 7]: + result = rate_sweep_single(m_base=m_base, z=32, lam_s=20.0, + lam_b=0.1, n_frames=10) + assert result['fer'] == 0.0, f"m_base={m_base} failed at lam_s=20" +``` + +**Step 2: Run tests to verify they fail** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc_analysis.py::TestIRAStaircase -v` +Expected: FAIL — module doesn't exist + +**Step 3: Implement build_ira_staircase, ira_encode, and rate_sweep_single** + +The key functions: +- `build_ira_staircase(m_base, z)`: Generate an IRA staircase base matrix and expand to full H. Info column connected to all rows, lower-triangular parity with spread shift values. +- `ira_encode(info, H_base, H_full, z)`: Generic IRA encoder that works for any staircase code (sequential parity solve, same algorithm as ldpc_sim.py). +- `rate_sweep_single(m_base, z, lam_s, lam_b, n_frames)`: Run n_frames through encode/channel/decode, return BER/FER. +- `run_rate_sweep(...)`: Full sweep over rates and lambda_s values, print summary table. + +```python +#!/usr/bin/env python3 +""" +LDPC Code Analysis Tool — Quantitative design validation. + +Four analyses: + --rate-sweep: Compare code rates (1/2 through 1/8) vs lambda_s + --matrix-compare: Compare base matrix designs at rate 1/8 + --quant-sweep: Quantization bits (4-32) impact on FER + --shannon-gap: Distance from Poisson channel capacity + +Usage: + python3 ldpc_analysis.py --rate-sweep + python3 ldpc_analysis.py --matrix-compare + python3 ldpc_analysis.py --quant-sweep + python3 ldpc_analysis.py --shannon-gap + python3 ldpc_analysis.py --all +""" +import numpy as np +import argparse +import json +import os +from ldpc_sim import ( + poisson_channel, quantize_llr, decode_layered_min_sum, + compute_syndrome_weight, sat_add_q, sat_sub_q, min_sum_cn_update, + Q_BITS, Q_MAX, Q_MIN, OFFSET, +) + + +# ============================================================================ +# Generic IRA staircase code construction +# ============================================================================ + +def build_ira_staircase(m_base, z=32): + """ + Build an IRA (Irregular Repeat-Accumulate) staircase QC-LDPC code. + + Structure: + - Column 0: info bits, connected to ALL m_base rows (high protection) + - Columns 1..m_base: parity bits, lower-triangular staircase + - Each row has 2-3 nonzero entries (sparse) + - Shift values spread across [0, z-1] to maximize girth + + Args: + m_base: number of base matrix rows (rate = 1 / (m_base + 1)) + z: lifting factor + + Returns: + H_base: (m_base, m_base+1) array of shift values (-1 = not connected) + H_full: (m_base*z, (m_base+1)*z) full binary parity check matrix + """ + n_base = m_base + 1 + H_base = np.full((m_base, n_base), -1, dtype=np.int16) + + # Info column (col 0): connect to all rows with spread shifts + for r in range(m_base): + H_base[r, 0] = (r * z // m_base) % z + + # Staircase parity: row r connects to col r+1 (diagonal) + # Row 0: col 1 only (besides info) + # Row r>0: col r and col r+1 + H_base[0, 1] = (5 * z // m_base) % z if m_base > 1 else 0 + + for r in range(1, m_base): + # Sub-diagonal: connect to previous parity column + H_base[r, r] = (r * 7) % z + # Diagonal: connect to own parity column + H_base[r, r + 1] = 0 + + # Expand to full binary matrix + n = n_base * z + m = m_base * z + H_full = np.zeros((m, n), dtype=np.int8) + for r in range(m_base): + for c in range(n_base): + shift = int(H_base[r, c]) + if shift < 0: + continue + for zz in range(z): + col_idx = c * z + (zz + shift) % z + H_full[r * z + zz, col_idx] = 1 + + return H_base, H_full + + +def ira_encode(info, H_base, H_full, z=32): + """ + Encode using IRA staircase structure (generic, any rate). + + Same algorithm as ldpc_sim.ldpc_encode but works for any staircase. + """ + m_base = H_base.shape[0] + n_base = H_base.shape[1] + k = z + + assert len(info) == k + + def apply_p(x, s): + return np.roll(x, -int(s)) + + def inv_p(y, s): + return np.roll(y, int(s)) + + p = [np.zeros(z, dtype=np.int8) for _ in range(n_base)] + + # Row 0: solve for p[1] + accum = apply_p(info, H_base[0, 0]) + p[1] = inv_p(accum, H_base[0, 1]) + + # Rows 1..m_base-1: solve for p[2]..p[m_base] + for r in range(1, m_base): + accum = apply_p(info, H_base[r, 0]) + accum = (accum + apply_p(p[r], H_base[r, r])) % 2 + p[r + 1] = inv_p(accum, H_base[r, r + 1]) + + codeword = np.concatenate([info] + [p[c] for c in range(1, n_base)]) + + # Verify + check = H_full @ codeword % 2 + assert np.all(check == 0), f"Encoding failed: syndrome weight = {check.sum()}" + + return codeword + + +def generic_decode(llr_q, H_base, z=32, max_iter=30, early_term=True, + q_bits=Q_BITS): + """ + Generic layered min-sum decoder for any IRA staircase code. + + Same algorithm as ldpc_sim.decode_layered_min_sum but parameterized. + """ + m_base = H_base.shape[0] + n_base = H_base.shape[1] + n = n_base * z + k = z + q_max = 2**(q_bits-1) - 1 + q_min = -(2**(q_bits-1)) + + def sat_add(a, b): + s = int(a) + int(b) + return max(q_min, min(q_max, s)) + + def sat_sub(a, b): + return sat_add(a, -b) + + beliefs = [int(x) for x in llr_q] + msg = [[[0]*z for _ in range(n_base)] for _ in range(m_base)] + + for iteration in range(max_iter): + for row in range(m_base): + connected = [c for c in range(n_base) if H_base[row, c] >= 0] + dc = len(connected) + + vn_to_cn = [[0]*z for _ in range(dc)] + for ci, col in enumerate(connected): + shift = int(H_base[row, col]) + for zz in range(z): + sz = (zz + shift) % z + bi = col * z + sz + vn_to_cn[ci][zz] = sat_sub(beliefs[bi], msg[row][col][zz]) + + cn_to_vn = [[0]*z for _ in range(dc)] + for zz in range(z): + cn_in = [vn_to_cn[ci][zz] for ci in range(dc)] + cn_out = min_sum_cn_update(cn_in) + for ci in range(dc): + cn_to_vn[ci][zz] = cn_out[ci] + + for ci, col in enumerate(connected): + shift = int(H_base[row, col]) + for zz in range(z): + sz = (zz + shift) % z + bi = col * z + sz + beliefs[bi] = sat_add(vn_to_cn[ci][zz], cn_to_vn[ci][zz]) + msg[row][col][zz] = cn_to_vn[ci][zz] + + # Syndrome check + hard = [1 if b < 0 else 0 for b in beliefs] + syn_wt = 0 + for r in range(m_base): + for zz in range(z): + parity = 0 + for c in range(n_base): + shift = int(H_base[r, c]) + if shift < 0: + continue + sz = (zz + shift) % z + bi = c * z + sz + parity ^= hard[bi] + if parity: + syn_wt += 1 + + if early_term and syn_wt == 0: + return np.array(hard[:k]), True, iteration + 1, 0 + + hard = [1 if b < 0 else 0 for b in beliefs] + return np.array(hard[:k]), syn_wt == 0, max_iter, syn_wt + + +def rate_sweep_single(m_base, z=32, lam_s=2.0, lam_b=0.1, n_frames=100, + max_iter=30, q_bits=Q_BITS): + """ + Run FER simulation for a single code rate and lambda_s. + + Returns dict with ber, fer, avg_iter. + """ + H_base, H_full = build_ira_staircase(m_base, z) + k = z + bit_errors = 0 + frame_errors = 0 + total_bits = 0 + total_iter = 0 + + for _ in range(n_frames): + info = np.random.randint(0, 2, k).astype(np.int8) + codeword = ira_encode(info, H_base, H_full, z) + llr, _ = poisson_channel(codeword, lam_s, lam_b) + + # Quantize with configurable bits + q_max = 2**(q_bits-1) - 1 + q_min = -(2**(q_bits-1)) + scale = q_max / 5.0 + llr_q = np.clip(np.round(llr * scale), q_min, q_max).astype(int) + + decoded, converged, iters, _ = generic_decode( + llr_q, H_base, z, max_iter, q_bits=q_bits + ) + total_iter += iters + errs = np.sum(decoded != info) + bit_errors += errs + total_bits += k + if errs > 0: + frame_errors += 1 + + return { + 'rate': f"1/{m_base+1}", + 'rate_val': 1.0 / (m_base + 1), + 'm_base': m_base, + 'lam_s': lam_s, + 'ber': bit_errors / total_bits if total_bits > 0 else 0, + 'fer': frame_errors / n_frames, + 'avg_iter': total_iter / n_frames, + } + + +# ============================================================================ +# Analysis 1: Rate Comparison +# ============================================================================ + +def run_rate_sweep(lam_b=0.1, n_frames=500, max_iter=30): + """Compare code rates across lambda_s range.""" + rates = [ + (1, "1/2"), + (2, "1/3"), + (3, "1/4"), + (5, "1/6"), + (7, "1/8"), + ] + lam_s_values = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 7.0, 10.0] + + print("=== Rate Comparison: FER vs lambda_s ===") + print(f"Frames per point: {n_frames}, lam_b={lam_b}, max_iter={max_iter}") + print() + + # Header + header = f"{'lam_s':>8s}" + for _, name in rates: + header += f" {name:>8s}" + print(header) + print("-" * (8 + 9 * len(rates))) + + all_results = [] + for lam_s in lam_s_values: + row = f"{lam_s:8.1f}" + for m_base, name in rates: + np.random.seed(42) + r = rate_sweep_single(m_base, lam_s=lam_s, lam_b=lam_b, + n_frames=n_frames, max_iter=max_iter) + row += f" {r['fer']:8.3f}" + all_results.append(r) + print(row) + + # Summary: threshold lambda_s for each rate (FER < 10%) + print("\n--- Threshold lambda_s (FER < 10%) ---") + for m_base, name in rates: + rate_results = [r for r in all_results if r['m_base'] == m_base] + threshold = None + for r in sorted(rate_results, key=lambda x: x['lam_s']): + if r['fer'] < 0.10: + threshold = r['lam_s'] + break + t_str = f"{threshold:.1f}" if threshold else ">10" + print(f" Rate {name}: threshold lam_s = {t_str} photons/slot") + + return all_results + + +# ============================================================================ +# Analysis 2: Base Matrix Comparison +# ============================================================================ + +def compute_girth(H_base, z): + """ + Compute girth (shortest cycle) of the Tanner graph. + + Uses BFS from each VN to find shortest cycle. + """ + m_base = H_base.shape[0] + n_base = H_base.shape[1] + n = n_base * z + m = m_base * z + + # Build adjacency: VN -> list of CN neighbors, CN -> list of VN neighbors + vn_to_cn = [[] for _ in range(n)] + cn_to_vn = [[] for _ in range(m)] + for r in range(m_base): + for c in range(n_base): + shift = int(H_base[r, c]) + if shift < 0: + continue + for zz in range(z): + vn = c * z + (zz + shift) % z + cn = r * z + zz + vn_to_cn[vn].append(cn) + cn_to_vn[cn].append(vn) + + min_cycle = float('inf') + + # BFS from a subset of VNs (all VNs for small codes) + sample_vns = range(min(n, 64)) # sample for speed + for start_vn in sample_vns: + # BFS: track (node_type, node_id) with distances + # node_type: 0=VN, 1=CN + dist = {} + parent = {} + queue = [(0, start_vn, -1, -1)] # (distance, node, parent_type, parent_id) + dist[(0, start_vn)] = 0 + parent[(0, start_vn)] = None + + qi = 0 + while qi < len(queue): + d, node, pt, pid = queue[qi] + qi += 1 + node_type = 0 if (0, node) in dist and dist[(0, node)] == d else 1 + + # Determine actual type + if qi == 1: + node_type = 0 # start is VN + + # Get neighbors + if (0, node) in dist and dist[(0, node)] == d: + # This is a VN, go to CN neighbors + for cn in vn_to_cn[node]: + if (1, cn) == (pt, pid): + continue # don't go back + if (1, cn) in dist: + cycle_len = d + 1 + dist[(1, cn)] + min_cycle = min(min_cycle, cycle_len) + else: + dist[(1, cn)] = d + 1 + queue.append((d + 1, cn, 0, node)) + elif (1, node) in dist and dist[(1, node)] == d: + # This is a CN, go to VN neighbors + for vn in cn_to_vn[node]: + if (0, vn) == (pt, pid): + continue + if (0, vn) in dist: + cycle_len = d + 1 + dist[(0, vn)] + min_cycle = min(min_cycle, cycle_len) + else: + dist[(0, vn)] = d + 1 + queue.append((d + 1, vn, 1, node)) + + if d > min_cycle: + break + + return min_cycle if min_cycle < float('inf') else -1 + + +def build_improved_staircase(z=32): + """ + Improved IRA staircase with better degree distribution. + + Adds extra connections to low-degree columns while keeping + lower-triangular parity structure for easy encoding. + Key improvement: col 7 gets extra connection (dv=1 -> dv=2). + """ + m_base = 7 + n_base = 8 + H_base = np.full((m_base, n_base), -1, dtype=np.int16) + + # Info column (col 0): connect to all rows (same as original) + H_base[0, 0] = 0 + H_base[1, 0] = 11 + H_base[2, 0] = 17 + H_base[3, 0] = 23 + H_base[4, 0] = 29 + H_base[5, 0] = 3 + H_base[6, 0] = 9 + + # Staircase parity (same diagonal structure) + H_base[0, 1] = 5 + H_base[1, 1] = 3; H_base[1, 2] = 0 + H_base[2, 2] = 7; H_base[2, 3] = 0 + H_base[3, 3] = 13; H_base[3, 4] = 0 + H_base[4, 4] = 19; H_base[4, 5] = 0 + H_base[5, 5] = 25; H_base[5, 6] = 0 + H_base[6, 6] = 31; H_base[6, 7] = 0 + + # Improvements: add connections to boost weak columns + # Col 7 (dv=1 -> dv=2): connect to row 0 + H_base[0, 7] = 15 + # Col 1 currently dv=2, boost to dv=3: connect to row 3 + H_base[3, 1] = 21 + + # Expand + n = n_base * z + m = m_base * z + H_full = np.zeros((m, n), dtype=np.int8) + for r in range(m_base): + for c in range(n_base): + shift = int(H_base[r, c]) + if shift < 0: + continue + for zz in range(z): + col_idx = c * z + (zz + shift) % z + H_full[r * z + zz, col_idx] = 1 + + return H_base, H_full + + +def build_peg_matrix(z=32): + """ + PEG-constructed base matrix for rate 1/8. + + Progressive Edge Growth: greedily add edges to maximize local girth. + Target degree distribution: VN degrees [3,2,2,2,2,2,2,2] average ~2.1. + More uniform than staircase. + + Note: this loses the trivial staircase encoding property. + Encoding requires solving H*c=0 via back-substitution on the + parity submatrix, which must still be full-rank. + """ + m_base = 7 + n_base = 8 + H_base = np.full((m_base, n_base), -1, dtype=np.int16) + + # PEG-like construction: spread edges to maximize girth + # Col 0 (info): connect to all 7 rows (dv=7, high protection) + H_base[0, 0] = 0 + H_base[1, 0] = 11 + H_base[2, 0] = 17 + H_base[3, 0] = 23 + H_base[4, 0] = 29 + H_base[5, 0] = 3 + H_base[6, 0] = 9 + + # Parity columns: each connected to exactly 2 rows (dv=2) + # with shifts chosen for maximum girth spread + # Ensure parity sub-matrix is invertible for encoding + H_base[0, 1] = 5; H_base[1, 1] = 3 + H_base[1, 2] = 0; H_base[2, 2] = 7 + H_base[2, 3] = 0; H_base[3, 3] = 13 + H_base[3, 4] = 0; H_base[4, 4] = 19 + H_base[4, 5] = 0; H_base[5, 5] = 25 + H_base[5, 6] = 0; H_base[6, 6] = 31 + H_base[6, 7] = 0; H_base[0, 7] = 15 + + # This is actually a ring structure (not strictly lower-triangular) + # Encoding: need to solve via the ring structure + + # Expand + n = n_base * z + m = m_base * z + H_full = np.zeros((m, n), dtype=np.int8) + for r in range(m_base): + for c in range(n_base): + shift = int(H_base[r, c]) + if shift < 0: + continue + for zz in range(z): + col_idx = c * z + (zz + shift) % z + H_full[r * z + zz, col_idx] = 1 + + return H_base, H_full + + +def peg_encode(info, H_base, H_full, z=32): + """ + Encode for PEG matrix using Gaussian elimination on parity submatrix. + + For non-staircase matrices, we solve H_parity * p = H_info * info (mod 2) + using the full H matrix. + """ + m_base = H_base.shape[0] + n_base = H_base.shape[1] + k = z + m = m_base * z + n = n_base * z + + # H = [H_info | H_parity], solve H_parity * p = H_info * info mod 2 + H_info = H_full[:, :k] + H_parity = H_full[:, k:] + + rhs = H_info @ info % 2 + + # Solve via GF(2) Gaussian elimination + # Augmented matrix [H_parity | rhs] + aug = np.hstack([H_parity.copy(), rhs.reshape(-1, 1)]).astype(np.int8) + rows, cols_p = H_parity.shape + + pivot_row = 0 + pivot_cols = [] + for col in range(cols_p): + # Find pivot + found = -1 + for r in range(pivot_row, rows): + if aug[r, col] == 1: + found = r + break + if found == -1: + continue + # Swap + aug[[pivot_row, found]] = aug[[found, pivot_row]] + pivot_cols.append(col) + # Eliminate + for r in range(rows): + if r != pivot_row and aug[r, col] == 1: + aug[r] = (aug[r] + aug[pivot_row]) % 2 + pivot_row += 1 + + # Back-substitute + parity = np.zeros(cols_p, dtype=np.int8) + for i, col in enumerate(pivot_cols): + parity[col] = aug[i, -1] + + codeword = np.concatenate([info, parity]) + check = H_full @ codeword % 2 + assert np.all(check == 0), f"PEG encoding failed: syn={check.sum()}" + return codeword + + +def run_matrix_comparison(lam_b=0.1, n_frames=500, max_iter=30): + """Compare base matrix designs at rate 1/8.""" + from ldpc_sim import H_BASE as ORIG_H_BASE, build_full_h_matrix, ldpc_encode + + matrices = { + 'Original staircase': (ORIG_H_BASE, build_full_h_matrix(), ldpc_encode), + } + + # Improved staircase + imp_base, imp_full = build_improved_staircase() + # Need a custom encoder since it's not pure staircase anymore + # Actually it IS still lower-triangular in parity, but with extra + # connections — we can use the PEG encoder (GF2 solve) for safety + matrices['Improved staircase'] = (imp_base, imp_full, None) + + # PEG + peg_base, peg_full = build_peg_matrix() + matrices['PEG ring'] = (peg_base, peg_full, None) + + z = 32 + lam_s_values = [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0] + + print("=== Base Matrix Comparison (Rate 1/8) ===") + print(f"Frames per point: {n_frames}, lam_b={lam_b}") + print() + + # Degree distributions and girth + for name, (H_base, H_full, _) in matrices.items(): + m_base_m = H_base.shape[0] + n_base_m = H_base.shape[1] + vn_degrees = [] + for c in range(n_base_m): + dv = sum(1 for r in range(m_base_m) if H_base[r, c] >= 0) + vn_degrees.append(dv) + cn_degrees = [] + for r in range(m_base_m): + dc = sum(1 for c in range(n_base_m) if H_base[r, c] >= 0) + cn_degrees.append(dc) + girth = compute_girth(H_base, z) + print(f"{name}:") + print(f" VN degrees: {vn_degrees}") + print(f" CN degrees: {cn_degrees}") + print(f" Girth: {girth}") + rank = np.linalg.matrix_rank(H_full.astype(float)) + print(f" H rank: {rank} / {H_base.shape[0]*z}") + print() + + # FER sweep + header = f"{'lam_s':>8s}" + for name in matrices: + short = name[:12] + header += f" {short:>14s}" + print(header) + print("-" * (8 + 15 * len(matrices))) + + all_results = [] + for lam_s in lam_s_values: + row = f"{lam_s:8.1f}" + for name, (H_base, H_full, enc_fn) in matrices.items(): + np.random.seed(42) + k = z + n = H_base.shape[1] * z + frame_errors = 0 + + for _ in range(n_frames): + info = np.random.randint(0, 2, k).astype(np.int8) + if enc_fn is not None: + codeword = enc_fn(info, H_full) + else: + codeword = peg_encode(info, H_base, H_full, z) + llr, _ = poisson_channel(codeword, lam_s, lam_b) + llr_q = quantize_llr(llr) + decoded, converged, _, _ = generic_decode( + llr_q, H_base, z, max_iter + ) + if np.any(decoded != info): + frame_errors += 1 + + fer = frame_errors / n_frames + row += f" {fer:14.3f}" + all_results.append({ + 'matrix': name, 'lam_s': lam_s, 'fer': fer + }) + print(row) + + return all_results + + +# ============================================================================ +# Analysis 3: Quantization Sweep +# ============================================================================ + +def run_quant_sweep(lam_s=None, lam_b=0.1, n_frames=500, max_iter=30): + """Sweep quantization bits and measure FER impact.""" + from ldpc_sim import H_BASE, build_full_h_matrix, ldpc_encode + + H_base = H_BASE + H_full = build_full_h_matrix() + z = 32 + + quant_bits = [4, 5, 6, 8, 10, 16] + + # If no lambda_s given, test at a few points + if lam_s is None: + lam_s_values = [2.0, 3.0, 5.0] + else: + lam_s_values = [lam_s] + + print("=== Quantization Sweep ===") + print(f"Frames per point: {n_frames}, lam_b={lam_b}") + print() + + header = f"{'lam_s':>8s}" + for q in quant_bits: + header += f" {q:>6d}-bit" + header += f" {'float':>10s}" + print(header) + print("-" * (8 + 10 * (len(quant_bits) + 1))) + + all_results = [] + for ls in lam_s_values: + row = f"{ls:8.1f}" + + # Fixed quantization levels + for q in quant_bits: + np.random.seed(42) + frame_errors = 0 + for _ in range(n_frames): + info = np.random.randint(0, 2, z).astype(np.int8) + codeword = ldpc_encode(info, H_full) + llr, _ = poisson_channel(codeword, ls, lam_b) + + q_max = 2**(q-1) - 1 + q_min = -(2**(q-1)) + scale = q_max / 5.0 + llr_q = np.clip(np.round(llr * scale), q_min, q_max).astype(int) + + decoded, _, _, _ = generic_decode( + llr_q, H_base, z, max_iter, q_bits=q + ) + if np.any(decoded != info): + frame_errors += 1 + + fer = frame_errors / n_frames + row += f" {fer:9.3f}" + all_results.append({ + 'lam_s': ls, 'q_bits': q, 'fer': fer + }) + + # Float (no quantization — use very wide integers to approximate) + np.random.seed(42) + frame_errors = 0 + for _ in range(n_frames): + info = np.random.randint(0, 2, z).astype(np.int8) + codeword = ldpc_encode(info, H_full) + llr, _ = poisson_channel(codeword, ls, lam_b) + + # Use 16-bit with high scale for "float-like" precision + scale = 1000.0 + llr_q = np.clip(np.round(llr * scale), -32768, 32767).astype(int) + decoded, _, _, _ = generic_decode( + llr_q, H_base, z, max_iter, q_bits=16 + ) + if np.any(decoded != info): + frame_errors += 1 + + fer = frame_errors / n_frames + row += f" {fer:9.3f}" + all_results.append({ + 'lam_s': ls, 'q_bits': 'float', 'fer': fer + }) + print(row) + + return all_results + + +# ============================================================================ +# Analysis 4: Shannon Gap +# ============================================================================ + +def poisson_channel_capacity(lam_s, lam_b, max_y=50): + """ + Compute binary-input Poisson channel capacity. + + C = max_p I(X;Y) where X in {0,1}, Y ~ Poisson(x*lam_s + lam_b) + + For OOK with equal priors (p=0.5): + I(X;Y) = H(Y) - 0.5*H(Y|X=0) - 0.5*H(Y|X=1) + """ + from scipy.stats import poisson + + def entropy_poisson(lam, max_y): + """Entropy of Poisson(lam) truncated to [0, max_y].""" + if lam < 1e-10: + return 0.0 + pmf = [poisson.pmf(y, lam) for y in range(max_y + 1)] + total = sum(pmf) + h = 0.0 + for p in pmf: + if p > 0: + h -= p * np.log2(p / total) + return h + + # H(Y|X=0) and H(Y|X=1) + h_y_x0 = entropy_poisson(lam_b, max_y) + h_y_x1 = entropy_poisson(lam_s + lam_b, max_y) + + # Optimize over input probability p + best_c = 0.0 + for p1_pct in range(0, 101): + p1 = p1_pct / 100.0 + p0 = 1.0 - p1 + + # H(Y) where Y has mixture distribution + pmf_mix = [] + for y in range(max_y + 1): + pmf_mix.append(p0 * poisson.pmf(y, lam_b) + + p1 * poisson.pmf(y, lam_s + lam_b)) + h_y = 0.0 + for p in pmf_mix: + if p > 0: + h_y -= p * np.log2(p) + + c = h_y - p0 * h_y_x0 - p1 * h_y_x1 + best_c = max(best_c, c) + + return best_c + + +def run_shannon_gap(lam_b=0.1): + """Compute Shannon limits and compare to decoder thresholds.""" + print("=== Shannon Gap Analysis ===") + print(f"Binary-input Poisson channel, lam_b={lam_b}") + print() + + rates = [ + (1, "1/2", 0.5), + (2, "1/3", 1/3), + (3, "1/4", 0.25), + (5, "1/6", 1/6), + (7, "1/8", 0.125), + ] + + print(f"{'Rate':>8s} {'Shannon':>10s} {'Capacity':>10s}") + print(f"{'':>8s} {'lam_s':>10s} {'at limit':>10s}") + print("-" * 32) + + results = [] + for m_base, name, rate_val in rates: + # Binary search for minimum lam_s where C >= rate + lo, hi = 0.01, 50.0 + for _ in range(50): # bisection + mid = (lo + hi) / 2.0 + c = poisson_channel_capacity(mid, lam_b) + if c >= rate_val: + hi = mid + else: + lo = mid + shannon_lam_s = (lo + hi) / 2.0 + cap_at_limit = poisson_channel_capacity(shannon_lam_s, lam_b) + + print(f"{name:>8s} {shannon_lam_s:10.3f} {cap_at_limit:10.4f}") + results.append({ + 'rate': name, 'rate_val': rate_val, + 'shannon_lam_s': shannon_lam_s, + 'capacity_at_limit': cap_at_limit, + }) + + print() + print("Shannon limit = minimum lam_s (photons/slot) where") + print("channel capacity >= code rate. Decoder must operate above this.") + print() + print("Compare decoder thresholds (from --rate-sweep) to these limits") + print("to compute the gap in dB: gap = 10*log10(decoder_threshold / shannon_limit)") + + return results + + +# ============================================================================ +# Main +# ============================================================================ + +def main(): + parser = argparse.ArgumentParser(description='LDPC Code Analysis') + parser.add_argument('--rate-sweep', action='store_true', + help='Compare code rates vs lambda_s') + parser.add_argument('--matrix-compare', action='store_true', + help='Compare base matrix designs') + parser.add_argument('--quant-sweep', action='store_true', + help='Quantization bits impact') + parser.add_argument('--shannon-gap', action='store_true', + help='Distance from channel capacity') + parser.add_argument('--all', action='store_true', + help='Run all analyses') + parser.add_argument('--n-frames', type=int, default=500, + help='Frames per data point (default: 500)') + parser.add_argument('--lam-b', type=float, default=0.1, + help='Background photons/slot (default: 0.1)') + parser.add_argument('--seed', type=int, default=42) + args = parser.parse_args() + + np.random.seed(args.seed) + + results = {} + + if args.rate_sweep or args.all: + results['rate_sweep'] = run_rate_sweep(args.lam_b, args.n_frames) + print() + + if args.matrix_compare or args.all: + results['matrix_compare'] = run_matrix_comparison( + args.lam_b, args.n_frames + ) + print() + + if args.quant_sweep or args.all: + results['quant_sweep'] = run_quant_sweep( + lam_b=args.lam_b, n_frames=args.n_frames + ) + print() + + if args.shannon_gap or args.all: + results['shannon_gap'] = run_shannon_gap(args.lam_b) + print() + + if results: + out_path = os.path.join(os.path.dirname(__file__) or '.', '..', + 'data', 'analysis_results.json') + os.makedirs(os.path.dirname(out_path), exist_ok=True) + with open(out_path, 'w') as f: + json.dump(results, f, indent=2, default=str) + print(f"Results saved to {out_path}") + else: + parser.print_help() + + +if __name__ == '__main__': + main() +``` + +**Step 4: Run tests** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc_analysis.py -v` +Expected: All PASS + +**Step 5: Commit** + +```bash +git add model/ldpc_analysis.py model/test_ldpc_analysis.py +git commit -m "feat: add code analysis tool with rate sweep and matrix comparison" +``` + +--- + +### Task 4: Run All Analyses and Capture Results + +**Step 1: Run rate sweep** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --rate-sweep --n-frames 200` +Expected: Table showing FER vs lambda_s for each rate. Rate 1/8 should have the lowest threshold. + +**Step 2: Run matrix comparison** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --matrix-compare --n-frames 200` +Expected: Table comparing original vs improved vs PEG matrices. + +**Step 3: Run quantization sweep** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --quant-sweep --n-frames 200` +Expected: FER vs quantization bits table. + +**Step 4: Run Shannon gap** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --shannon-gap` +Expected: Shannon limit lambda_s for each rate. + +**Step 5: Run frame sync sweep** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --sweep --n-trials 20` +Expected: Acquisition success rate vs lambda_s. + +**Step 6: Run frame sync re-sync test** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --resync-test --lam-s 5.0 --n-trials 20` +Expected: Re-sync success at various slip amounts. + +**Step 7: Review results, fix any bugs, commit data** + +```bash +git add data/ +git commit -m "data: add analysis and frame sync simulation results" +``` + +--- + +### Task 5: Run Full Test Suite + +**Step 1: Run all tests** + +Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/ -v` +Expected: All tests PASS + +**Step 2: Final commit** + +```bash +git add -A +git commit -m "test: all model tests passing for frame sync and code analysis" +```