# Frame Sync & Code Analysis Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Build a frame synchronization prototype and four quantitative code analyses to validate design decisions for the LDPC optical decoder (target: 1-2 photons/slot). **Architecture:** Two new scripts (`frame_sync.py`, `ldpc_analysis.py`) import encoder/decoder/channel from existing `ldpc_sim.py`. Frame sync simulates a continuous stream with unknown offset. Analyses compare rates, matrices, quantization, and Shannon limits. All output summary tables to stdout and save results to `data/`. **Tech Stack:** Python 3, numpy, scipy (for Poisson PMF and optimization in Shannon gap). pytest for validation tests. **Key reference files:** - `model/ldpc_sim.py` — encoder, decoder, channel, syndrome check (DO NOT MODIFY) - `docs/plans/2026-02-23-frame-sync-and-code-analysis-design.md` — design document --- ### Task 1: Test Harness — Validate Existing Model **Files:** - Create: `model/test_ldpc.py` **Step 1: Write validation tests for existing ldpc_sim.py** These tests establish a known-good baseline before we build on top. ```python """Tests for ldpc_sim.py — validates encoder, decoder, channel, syndrome.""" import numpy as np import pytest from ldpc_sim import ( build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr, decode_layered_min_sum, compute_syndrome_weight, cyclic_shift, sat_add_q, sat_sub_q, min_sum_cn_update, N, K, M, Z, N_BASE, M_BASE, H_BASE, Q_MAX, Q_MIN, ) class TestCodeParameters: def test_dimensions(self): assert N == 256 assert K == 32 assert M == 224 assert Z == 32 assert N_BASE == 8 assert M_BASE == 7 def test_h_base_shape(self): assert H_BASE.shape == (M_BASE, N_BASE) def test_h_base_info_column_fully_connected(self): """Column 0 (info) should connect to all 7 rows.""" for r in range(M_BASE): assert H_BASE[r, 0] >= 0 def test_h_base_staircase_structure(self): """Parity columns should form lower-triangular staircase.""" for r in range(M_BASE): # Diagonal parity column (r+1) should be connected assert H_BASE[r, r + 1] >= 0 # Above diagonal should be -1 (not connected) for c in range(r + 2, N_BASE): assert H_BASE[r, c] == -1 class TestHMatrix: def test_full_h_dimensions(self): H = build_full_h_matrix() assert H.shape == (M, N) def test_full_h_binary(self): H = build_full_h_matrix() assert set(np.unique(H)).issubset({0, 1}) def test_full_h_row_weights(self): """Each row in a Z-block should have the same weight (QC property).""" H = build_full_h_matrix() for r in range(M_BASE): weights = [H[r * Z + z].sum() for z in range(Z)] assert len(set(weights)) == 1, f"Row block {r} has inconsistent weights" def test_full_h_rank(self): """H should have rank M (full rank) for the code to work.""" H = build_full_h_matrix() rank = np.linalg.matrix_rank(H.astype(float)) assert rank == M class TestEncoder: def test_encode_all_zeros(self): H = build_full_h_matrix() info = np.zeros(K, dtype=np.int8) codeword = ldpc_encode(info, H) assert len(codeword) == N assert np.all(codeword == 0) # all-zero is always a codeword def test_encode_satisfies_parity(self): H = build_full_h_matrix() np.random.seed(123) info = np.random.randint(0, 2, K).astype(np.int8) codeword = ldpc_encode(info, H) syndrome = H @ codeword % 2 assert np.all(syndrome == 0) def test_encode_preserves_info_bits(self): H = build_full_h_matrix() np.random.seed(456) info = np.random.randint(0, 2, K).astype(np.int8) codeword = ldpc_encode(info, H) assert np.array_equal(codeword[:K], info) def test_encode_multiple_random(self): """Encode 20 random messages, all must satisfy parity.""" H = build_full_h_matrix() for seed in range(20): np.random.seed(seed) info = np.random.randint(0, 2, K).astype(np.int8) codeword = ldpc_encode(info, H) syndrome = H @ codeword % 2 assert np.all(syndrome == 0), f"Failed for seed {seed}" class TestSaturatingArithmetic: def test_sat_add_normal(self): assert sat_add_q(10, 5) == 15 def test_sat_add_positive_overflow(self): assert sat_add_q(Q_MAX, 1) == Q_MAX def test_sat_add_negative_overflow(self): assert sat_add_q(Q_MIN, -1) == Q_MIN def test_sat_sub(self): assert sat_sub_q(10, 3) == 7 assert sat_sub_q(Q_MIN, 1) == Q_MIN class TestMinSumCN: def test_all_positive(self): """All positive inputs: output signs should all be positive.""" msgs = [5, 10, 15] out = min_sum_cn_update(msgs, offset=1) assert all(o >= 0 for o in out) def test_one_negative(self): """One negative: that output positive, others negative.""" msgs = [-5, 10, 15] out = min_sum_cn_update(msgs, offset=1) assert out[0] >= 0 # extrinsic sign excludes own sign assert out[1] <= 0 assert out[2] <= 0 def test_magnitude_is_min_of_others(self): """Output magnitude = min of OTHER magnitudes - offset.""" msgs = [3, 10, 20] out = min_sum_cn_update(msgs, offset=1) # out[0]: min of {10,20} - 1 = 9 assert abs(out[0]) == 9 # out[1]: min of {3,20} - 1 = 2 assert abs(out[1]) == 2 # out[2]: min of {3,10} - 1 = 2 assert abs(out[2]) == 2 class TestDecoder: def test_decode_noiseless(self): """Noiseless channel (huge lambda_s) should decode perfectly.""" H = build_full_h_matrix() np.random.seed(789) info = np.random.randint(0, 2, K).astype(np.int8) codeword = ldpc_encode(info, H) # Noiseless: LLR is just +/- large value llr = np.where(codeword == 0, 20.0, -20.0) llr_q = quantize_llr(llr) decoded, converged, iters, syn_wt = decode_layered_min_sum(llr_q, max_iter=5) assert converged assert np.array_equal(decoded, info) assert iters == 1 # should converge in 1 iteration with perfect input def test_decode_high_snr(self): """High SNR (lambda_s=10) should decode reliably.""" H = build_full_h_matrix() np.random.seed(101) info = np.random.randint(0, 2, K).astype(np.int8) codeword = ldpc_encode(info, H) llr, _ = poisson_channel(codeword, lam_s=10.0, lam_b=0.1) llr_q = quantize_llr(llr) decoded, converged, _, _ = decode_layered_min_sum(llr_q) assert converged assert np.array_equal(decoded, info) class TestSyndrome: def test_valid_codeword_syndrome_zero(self): H = build_full_h_matrix() np.random.seed(202) info = np.random.randint(0, 2, K).astype(np.int8) codeword = ldpc_encode(info, H) assert compute_syndrome_weight(codeword.tolist()) == 0 def test_flipped_bit_nonzero_syndrome(self): H = build_full_h_matrix() info = np.zeros(K, dtype=np.int8) codeword = ldpc_encode(info, H) corrupted = codeword.copy() corrupted[0] ^= 1 # flip one bit assert compute_syndrome_weight(corrupted.tolist()) > 0 ``` **Step 2: Run tests to verify they pass** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc.py -v` Expected: All tests PASS (these test existing working code) **Step 3: Commit** ```bash git add model/test_ldpc.py git commit -m "test: add validation tests for existing LDPC model" ``` --- ### Task 2: Frame Sync — Streaming Infrastructure **Files:** - Create: `model/frame_sync.py` - Create: `model/test_frame_sync.py` **Step 1: Write the test for stream generation and syndrome screening** ```python """Tests for frame synchronization.""" import numpy as np import pytest from ldpc_sim import ( build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr, compute_syndrome_weight, N, K, Z, ) from frame_sync import generate_stream, syndrome_screen class TestStreamGeneration: def test_stream_length(self): """Stream should be n_frames * N + offset samples long.""" H = build_full_h_matrix() np.random.seed(42) stream_llr, true_offset, info_list = generate_stream( H, n_frames=5, lam_s=5.0, lam_b=0.1, offset=10 ) assert len(stream_llr) == 5 * N + 10 assert true_offset == 10 assert len(info_list) == 5 def test_stream_zero_offset(self): H = build_full_h_matrix() np.random.seed(42) stream_llr, true_offset, _ = generate_stream( H, n_frames=3, lam_s=5.0, lam_b=0.1, offset=0 ) assert true_offset == 0 assert len(stream_llr) == 3 * N def test_stream_random_offset(self): H = build_full_h_matrix() np.random.seed(42) stream_llr, true_offset, _ = generate_stream( H, n_frames=5, lam_s=5.0, lam_b=0.1, offset=None ) assert 0 <= true_offset < N assert len(stream_llr) == 5 * N + true_offset class TestSyndromeScreen: def test_correct_offset_low_syndrome(self): """At high SNR, correct offset should have very low syndrome weight.""" H = build_full_h_matrix() np.random.seed(42) stream_llr, true_offset, _ = generate_stream( H, n_frames=5, lam_s=10.0, lam_b=0.1, offset=37 ) scores = syndrome_screen(stream_llr, n_offsets=N) # Correct offset should have the lowest (or near-lowest) syndrome best_offset = min(scores, key=scores.get) assert best_offset == 37 def test_wrong_offsets_high_syndrome(self): """Wrong offsets should have syndrome weight near M/2 ~ 112.""" H = build_full_h_matrix() np.random.seed(42) stream_llr, true_offset, _ = generate_stream( H, n_frames=5, lam_s=10.0, lam_b=0.1, offset=0 ) scores = syndrome_screen(stream_llr, n_offsets=N) wrong_scores = [v for k, v in scores.items() if k != 0] avg_wrong = np.mean(wrong_scores) assert avg_wrong > 80 # should be near 112 for random ``` **Step 2: Run tests to verify they fail** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_frame_sync.py -v` Expected: FAIL — `frame_sync` module doesn't exist yet **Step 3: Implement generate_stream and syndrome_screen** ```python #!/usr/bin/env python3 """ Frame Synchronization Prototype for LDPC Optical Decoder. Simulates a continuous photon-counting stream with unknown frame alignment. Tests acquisition (finding codeword boundaries) and re-sync after loss. Usage: python3 frame_sync.py # Quick demo at a few lambda_s python3 frame_sync.py --sweep # Acquisition success vs lambda_s python3 frame_sync.py --resync-test # Re-sync after offset slip """ import numpy as np import argparse import json import os from ldpc_sim import ( build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr, decode_layered_min_sum, compute_syndrome_weight, N, K, M, Z, N_BASE, M_BASE, H_BASE, ) SCREENING_THRESHOLD = 50 # syndrome weight below this triggers full decode def generate_stream(H, n_frames=10, lam_s=2.0, lam_b=0.1, offset=None): """ Generate a continuous stream of channel LLRs with unknown frame offset. Args: H: full parity check matrix n_frames: number of codewords to concatenate lam_s: signal photons/slot lam_b: background photons/slot offset: bit offset to insert (None = random 0..N-1) Returns: stream_llr: continuous LLR stream (float, not quantized) true_offset: the actual offset used info_list: list of info bit arrays for each frame (ground truth) """ if offset is None: offset = np.random.randint(0, N) # Generate random prefix LLRs for the offset (noise-only, no codeword) # These represent receiving channel output before the first codeword starts prefix_llr = np.random.normal(0, 1, offset) # just noise # Generate codewords and their channel outputs info_list = [] codeword_llrs = [] for _ in range(n_frames): info = np.random.randint(0, 2, K).astype(np.int8) codeword = ldpc_encode(info, H) llr, _ = poisson_channel(codeword, lam_s, lam_b) info_list.append(info) codeword_llrs.append(llr) stream_llr = np.concatenate([prefix_llr] + codeword_llrs) return stream_llr, offset, info_list def syndrome_screen(stream_llr, n_offsets=None): """ Screen all candidate offsets using hard-decision syndrome weight. Args: stream_llr: continuous LLR stream n_offsets: number of offsets to try (default N=256) Returns: scores: dict mapping offset -> syndrome weight """ if n_offsets is None: n_offsets = N scores = {} for off in range(n_offsets): if off + N > len(stream_llr): break window = stream_llr[off:off + N] hard = [0 if v > 0 else 1 for v in window] syn_wt = compute_syndrome_weight(hard) scores[off] = syn_wt return scores def acquire_sync(stream_llr, max_confirm=2, max_iter=30, verbose=False): """ Acquire frame synchronization from a continuous stream. Algorithm: 1. Syndrome-screen all 256 offsets 2. For offsets below threshold, run full iterative decode 3. Confirm by decoding next max_confirm consecutive frames Args: stream_llr: continuous LLR stream max_confirm: number of consecutive frames to confirm sync max_iter: decoder max iterations verbose: print screening details Returns: dict with keys: locked: bool offset: detected offset (or -1) offsets_screened: number of offsets tried full_decodes: number of full iterative decodes run screening_cost: equivalent decode cycles for syndrome screening """ scores = syndrome_screen(stream_llr) # Sort by syndrome weight (best candidates first) ranked = sorted(scores.items(), key=lambda x: x[1]) full_decodes = 0 for off, syn_wt in ranked: if syn_wt >= SCREENING_THRESHOLD: break # all remaining offsets are worse # Full iterative decode at this offset window = stream_llr[off:off + N] llr_q = quantize_llr(window) decoded, converged, iters, _ = decode_layered_min_sum(llr_q, max_iter) full_decodes += 1 if not converged: continue # Confirm with consecutive frames confirmed = True for f in range(1, max_confirm + 1): start = off + f * N if start + N > len(stream_llr): confirmed = False break win_f = stream_llr[start:start + N] llr_q_f = quantize_llr(win_f) _, conv_f, _, _ = decode_layered_min_sum(llr_q_f, max_iter) full_decodes += 1 if not conv_f: confirmed = False break if confirmed: # Cost model: syndrome screening ~ 1/30 of a full decode each screening_cost = len(scores) / 30.0 return { 'locked': True, 'offset': off, 'offsets_screened': len(scores), 'full_decodes': full_decodes, 'screening_cost': screening_cost, 'total_equiv_decodes': screening_cost + full_decodes, } return { 'locked': False, 'offset': -1, 'offsets_screened': len(scores), 'full_decodes': full_decodes, 'screening_cost': len(scores) / 30.0, 'total_equiv_decodes': len(scores) / 30.0 + full_decodes, } def resync_test(stream_llr, true_offset, slip_amount, search_radius=16, max_iter=30): """ Test re-synchronization after an offset slip. Simulates the receiver losing sync by `slip_amount` bits, then searching nearby offsets to re-acquire. Returns: dict with locked, detected offset, cost """ slipped_offset = (true_offset + slip_amount) % N # First try nearby offsets (fast re-sync) for delta in range(-search_radius, search_radius + 1): off = (slipped_offset + delta) % N if off + N > len(stream_llr): continue window = stream_llr[off:off + N] hard = [0 if v > 0 else 1 for v in window] syn_wt = compute_syndrome_weight(hard) if syn_wt < SCREENING_THRESHOLD: llr_q = quantize_llr(window) _, converged, _, _ = decode_layered_min_sum(llr_q, max_iter) if converged: return { 'locked': True, 'offset': off, 'slip': slip_amount, 'search_radius_used': search_radius, 'needed_full_search': False, } # Fall back to full search result = acquire_sync(stream_llr) result['slip'] = slip_amount result['search_radius_used'] = search_radius result['needed_full_search'] = True return result def run_acquisition_sweep(lam_s_values, lam_b=0.1, n_trials=50, n_frames=10): """Sweep lambda_s and measure acquisition success rate.""" H = build_full_h_matrix() print(f"{'lam_s':>8s} {'success%':>10s} {'avg_decodes':>12s} " f"{'avg_cost':>10s} {'false_lock':>10s}") print("-" * 55) results = [] for lam_s in lam_s_values: successes = 0 false_locks = 0 total_decodes = 0 total_cost = 0 for trial in range(n_trials): np.random.seed(trial * 1000 + int(lam_s * 100)) stream, true_off, _ = generate_stream( H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None ) result = acquire_sync(stream) if result['locked']: if result['offset'] == true_off: successes += 1 else: false_locks += 1 total_decodes += result['full_decodes'] total_cost += result['total_equiv_decodes'] pct = 100 * successes / n_trials avg_dec = total_decodes / n_trials avg_cost = total_cost / n_trials fl_pct = 100 * false_locks / n_trials print(f"{lam_s:8.1f} {pct:9.1f}% {avg_dec:12.1f} " f"{avg_cost:10.1f} {fl_pct:9.1f}%") results.append({ 'lam_s': lam_s, 'success_pct': pct, 'avg_full_decodes': avg_dec, 'avg_equiv_cost': avg_cost, 'false_lock_pct': fl_pct, }) return results def run_resync_sweep(lam_s=5.0, lam_b=0.1, n_trials=20, n_frames=10): """Test re-sync at various slip amounts.""" H = build_full_h_matrix() print(f"\nRe-sync test at lam_s={lam_s}") print(f"{'slip':>6s} {'success%':>10s} {'full_search%':>12s}") print("-" * 32) for slip in [1, 2, 4, 8, 16, 32, 64, 128]: successes = 0 full_searches = 0 for trial in range(n_trials): np.random.seed(trial * 100 + slip) stream, true_off, _ = generate_stream( H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None ) result = resync_test(stream, true_off, slip) if result['locked']: successes += 1 if result.get('needed_full_search'): full_searches += 1 print(f"{slip:6d} {100*successes/n_trials:9.1f}% " f"{100*full_searches/n_trials:11.1f}%") def main(): parser = argparse.ArgumentParser(description='LDPC Frame Sync Prototype') parser.add_argument('--sweep', action='store_true', help='Acquisition success rate vs lambda_s') parser.add_argument('--resync-test', action='store_true', help='Re-sync after offset slip') parser.add_argument('--lam-s', type=float, default=5.0, help='Signal photons/slot (default: 5.0)') parser.add_argument('--lam-b', type=float, default=0.1, help='Background photons/slot (default: 0.1)') parser.add_argument('--n-trials', type=int, default=50, help='Trials per data point (default: 50)') parser.add_argument('--seed', type=int, default=42) args = parser.parse_args() if args.sweep: print("=== Frame Sync Acquisition Sweep ===") lam_s_values = [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0] results = run_acquisition_sweep(lam_s_values, args.lam_b, args.n_trials) out_path = os.path.join(os.path.dirname(__file__), '..', 'data', 'sync_acquisition_results.json') os.makedirs(os.path.dirname(out_path), exist_ok=True) with open(out_path, 'w') as f: json.dump(results, f, indent=2) print(f"\nResults saved to {out_path}") elif args.resync_test: run_resync_sweep(args.lam_s, args.lam_b, args.n_trials) else: # Quick demo print("=== Frame Sync Demo ===") H = build_full_h_matrix() np.random.seed(args.seed) stream, true_off, infos = generate_stream( H, n_frames=10, lam_s=args.lam_s, lam_b=args.lam_b, offset=None ) print(f"Stream: {len(stream)} samples, true offset: {true_off}") print(f"Signal: lam_s={args.lam_s}, lam_b={args.lam_b}") result = acquire_sync(stream, verbose=True) if result['locked']: match = "CORRECT" if result['offset'] == true_off else "WRONG" print(f"Locked at offset {result['offset']} ({match})") print(f" Screened {result['offsets_screened']} offsets") print(f" Full decodes: {result['full_decodes']}") print(f" Total cost: {result['total_equiv_decodes']:.1f} " f"equiv decodes") else: print("FAILED to acquire sync") if __name__ == '__main__': main() ``` **Step 4: Run tests to verify they pass** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_frame_sync.py -v` Expected: All PASS **Step 5: Run the demo to verify it works end-to-end** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --lam-s 5.0` Expected: Should lock at the correct offset **Step 6: Commit** ```bash git add model/frame_sync.py model/test_frame_sync.py git commit -m "feat: add frame synchronization prototype with tests" ``` --- ### Task 3: Code Analysis — Rate Comparison **Files:** - Create: `model/ldpc_analysis.py` - Create: `model/test_ldpc_analysis.py` **Step 1: Write the test for rate sweep** ```python """Tests for ldpc_analysis.py.""" import numpy as np import pytest from ldpc_analysis import build_ira_staircase, rate_sweep_single class TestIRAStaircase: def test_rate_1_2(self): """Rate 1/2: 1x2 base matrix, n=64, k=32.""" H_base, H_full = build_ira_staircase(m_base=1, z=32) assert H_base.shape == (1, 2) assert H_full.shape == (32, 64) assert np.linalg.matrix_rank(H_full.astype(float)) == 32 def test_rate_1_4(self): """Rate 1/4: 3x4 base matrix, n=128, k=32.""" H_base, H_full = build_ira_staircase(m_base=3, z=32) assert H_base.shape == (3, 4) assert H_full.shape == (96, 128) assert np.linalg.matrix_rank(H_full.astype(float)) == 96 def test_rate_1_8_matches_existing(self): """Rate 1/8 staircase should produce a valid code.""" H_base, H_full = build_ira_staircase(m_base=7, z=32) assert H_base.shape == (7, 8) assert H_full.shape == (224, 256) def test_encode_decode_roundtrip(self): """Any generated staircase code should encode/decode at high SNR.""" for m_base in [1, 2, 3, 5]: H_base, H_full = build_ira_staircase(m_base=m_base, z=32) n_base = m_base + 1 k = 32 n = n_base * 32 np.random.seed(42) info = np.random.randint(0, 2, k).astype(np.int8) # Use generic IRA encoder from ldpc_analysis import ira_encode codeword = ira_encode(info, H_base, H_full, z=32) syndrome = H_full @ codeword % 2 assert np.all(syndrome == 0), f"Failed for m_base={m_base}" class TestRateSweep: def test_high_snr_all_rates_decode(self): """At very high SNR, all rates should achieve FER=0.""" for m_base in [1, 3, 7]: result = rate_sweep_single(m_base=m_base, z=32, lam_s=20.0, lam_b=0.1, n_frames=10) assert result['fer'] == 0.0, f"m_base={m_base} failed at lam_s=20" ``` **Step 2: Run tests to verify they fail** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc_analysis.py::TestIRAStaircase -v` Expected: FAIL — module doesn't exist **Step 3: Implement build_ira_staircase, ira_encode, and rate_sweep_single** The key functions: - `build_ira_staircase(m_base, z)`: Generate an IRA staircase base matrix and expand to full H. Info column connected to all rows, lower-triangular parity with spread shift values. - `ira_encode(info, H_base, H_full, z)`: Generic IRA encoder that works for any staircase code (sequential parity solve, same algorithm as ldpc_sim.py). - `rate_sweep_single(m_base, z, lam_s, lam_b, n_frames)`: Run n_frames through encode/channel/decode, return BER/FER. - `run_rate_sweep(...)`: Full sweep over rates and lambda_s values, print summary table. ```python #!/usr/bin/env python3 """ LDPC Code Analysis Tool — Quantitative design validation. Four analyses: --rate-sweep: Compare code rates (1/2 through 1/8) vs lambda_s --matrix-compare: Compare base matrix designs at rate 1/8 --quant-sweep: Quantization bits (4-32) impact on FER --shannon-gap: Distance from Poisson channel capacity Usage: python3 ldpc_analysis.py --rate-sweep python3 ldpc_analysis.py --matrix-compare python3 ldpc_analysis.py --quant-sweep python3 ldpc_analysis.py --shannon-gap python3 ldpc_analysis.py --all """ import numpy as np import argparse import json import os from ldpc_sim import ( poisson_channel, quantize_llr, decode_layered_min_sum, compute_syndrome_weight, sat_add_q, sat_sub_q, min_sum_cn_update, Q_BITS, Q_MAX, Q_MIN, OFFSET, ) # ============================================================================ # Generic IRA staircase code construction # ============================================================================ def build_ira_staircase(m_base, z=32): """ Build an IRA (Irregular Repeat-Accumulate) staircase QC-LDPC code. Structure: - Column 0: info bits, connected to ALL m_base rows (high protection) - Columns 1..m_base: parity bits, lower-triangular staircase - Each row has 2-3 nonzero entries (sparse) - Shift values spread across [0, z-1] to maximize girth Args: m_base: number of base matrix rows (rate = 1 / (m_base + 1)) z: lifting factor Returns: H_base: (m_base, m_base+1) array of shift values (-1 = not connected) H_full: (m_base*z, (m_base+1)*z) full binary parity check matrix """ n_base = m_base + 1 H_base = np.full((m_base, n_base), -1, dtype=np.int16) # Info column (col 0): connect to all rows with spread shifts for r in range(m_base): H_base[r, 0] = (r * z // m_base) % z # Staircase parity: row r connects to col r+1 (diagonal) # Row 0: col 1 only (besides info) # Row r>0: col r and col r+1 H_base[0, 1] = (5 * z // m_base) % z if m_base > 1 else 0 for r in range(1, m_base): # Sub-diagonal: connect to previous parity column H_base[r, r] = (r * 7) % z # Diagonal: connect to own parity column H_base[r, r + 1] = 0 # Expand to full binary matrix n = n_base * z m = m_base * z H_full = np.zeros((m, n), dtype=np.int8) for r in range(m_base): for c in range(n_base): shift = int(H_base[r, c]) if shift < 0: continue for zz in range(z): col_idx = c * z + (zz + shift) % z H_full[r * z + zz, col_idx] = 1 return H_base, H_full def ira_encode(info, H_base, H_full, z=32): """ Encode using IRA staircase structure (generic, any rate). Same algorithm as ldpc_sim.ldpc_encode but works for any staircase. """ m_base = H_base.shape[0] n_base = H_base.shape[1] k = z assert len(info) == k def apply_p(x, s): return np.roll(x, -int(s)) def inv_p(y, s): return np.roll(y, int(s)) p = [np.zeros(z, dtype=np.int8) for _ in range(n_base)] # Row 0: solve for p[1] accum = apply_p(info, H_base[0, 0]) p[1] = inv_p(accum, H_base[0, 1]) # Rows 1..m_base-1: solve for p[2]..p[m_base] for r in range(1, m_base): accum = apply_p(info, H_base[r, 0]) accum = (accum + apply_p(p[r], H_base[r, r])) % 2 p[r + 1] = inv_p(accum, H_base[r, r + 1]) codeword = np.concatenate([info] + [p[c] for c in range(1, n_base)]) # Verify check = H_full @ codeword % 2 assert np.all(check == 0), f"Encoding failed: syndrome weight = {check.sum()}" return codeword def generic_decode(llr_q, H_base, z=32, max_iter=30, early_term=True, q_bits=Q_BITS): """ Generic layered min-sum decoder for any IRA staircase code. Same algorithm as ldpc_sim.decode_layered_min_sum but parameterized. """ m_base = H_base.shape[0] n_base = H_base.shape[1] n = n_base * z k = z q_max = 2**(q_bits-1) - 1 q_min = -(2**(q_bits-1)) def sat_add(a, b): s = int(a) + int(b) return max(q_min, min(q_max, s)) def sat_sub(a, b): return sat_add(a, -b) beliefs = [int(x) for x in llr_q] msg = [[[0]*z for _ in range(n_base)] for _ in range(m_base)] for iteration in range(max_iter): for row in range(m_base): connected = [c for c in range(n_base) if H_base[row, c] >= 0] dc = len(connected) vn_to_cn = [[0]*z for _ in range(dc)] for ci, col in enumerate(connected): shift = int(H_base[row, col]) for zz in range(z): sz = (zz + shift) % z bi = col * z + sz vn_to_cn[ci][zz] = sat_sub(beliefs[bi], msg[row][col][zz]) cn_to_vn = [[0]*z for _ in range(dc)] for zz in range(z): cn_in = [vn_to_cn[ci][zz] for ci in range(dc)] cn_out = min_sum_cn_update(cn_in) for ci in range(dc): cn_to_vn[ci][zz] = cn_out[ci] for ci, col in enumerate(connected): shift = int(H_base[row, col]) for zz in range(z): sz = (zz + shift) % z bi = col * z + sz beliefs[bi] = sat_add(vn_to_cn[ci][zz], cn_to_vn[ci][zz]) msg[row][col][zz] = cn_to_vn[ci][zz] # Syndrome check hard = [1 if b < 0 else 0 for b in beliefs] syn_wt = 0 for r in range(m_base): for zz in range(z): parity = 0 for c in range(n_base): shift = int(H_base[r, c]) if shift < 0: continue sz = (zz + shift) % z bi = c * z + sz parity ^= hard[bi] if parity: syn_wt += 1 if early_term and syn_wt == 0: return np.array(hard[:k]), True, iteration + 1, 0 hard = [1 if b < 0 else 0 for b in beliefs] return np.array(hard[:k]), syn_wt == 0, max_iter, syn_wt def rate_sweep_single(m_base, z=32, lam_s=2.0, lam_b=0.1, n_frames=100, max_iter=30, q_bits=Q_BITS): """ Run FER simulation for a single code rate and lambda_s. Returns dict with ber, fer, avg_iter. """ H_base, H_full = build_ira_staircase(m_base, z) k = z bit_errors = 0 frame_errors = 0 total_bits = 0 total_iter = 0 for _ in range(n_frames): info = np.random.randint(0, 2, k).astype(np.int8) codeword = ira_encode(info, H_base, H_full, z) llr, _ = poisson_channel(codeword, lam_s, lam_b) # Quantize with configurable bits q_max = 2**(q_bits-1) - 1 q_min = -(2**(q_bits-1)) scale = q_max / 5.0 llr_q = np.clip(np.round(llr * scale), q_min, q_max).astype(int) decoded, converged, iters, _ = generic_decode( llr_q, H_base, z, max_iter, q_bits=q_bits ) total_iter += iters errs = np.sum(decoded != info) bit_errors += errs total_bits += k if errs > 0: frame_errors += 1 return { 'rate': f"1/{m_base+1}", 'rate_val': 1.0 / (m_base + 1), 'm_base': m_base, 'lam_s': lam_s, 'ber': bit_errors / total_bits if total_bits > 0 else 0, 'fer': frame_errors / n_frames, 'avg_iter': total_iter / n_frames, } # ============================================================================ # Analysis 1: Rate Comparison # ============================================================================ def run_rate_sweep(lam_b=0.1, n_frames=500, max_iter=30): """Compare code rates across lambda_s range.""" rates = [ (1, "1/2"), (2, "1/3"), (3, "1/4"), (5, "1/6"), (7, "1/8"), ] lam_s_values = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 7.0, 10.0] print("=== Rate Comparison: FER vs lambda_s ===") print(f"Frames per point: {n_frames}, lam_b={lam_b}, max_iter={max_iter}") print() # Header header = f"{'lam_s':>8s}" for _, name in rates: header += f" {name:>8s}" print(header) print("-" * (8 + 9 * len(rates))) all_results = [] for lam_s in lam_s_values: row = f"{lam_s:8.1f}" for m_base, name in rates: np.random.seed(42) r = rate_sweep_single(m_base, lam_s=lam_s, lam_b=lam_b, n_frames=n_frames, max_iter=max_iter) row += f" {r['fer']:8.3f}" all_results.append(r) print(row) # Summary: threshold lambda_s for each rate (FER < 10%) print("\n--- Threshold lambda_s (FER < 10%) ---") for m_base, name in rates: rate_results = [r for r in all_results if r['m_base'] == m_base] threshold = None for r in sorted(rate_results, key=lambda x: x['lam_s']): if r['fer'] < 0.10: threshold = r['lam_s'] break t_str = f"{threshold:.1f}" if threshold else ">10" print(f" Rate {name}: threshold lam_s = {t_str} photons/slot") return all_results # ============================================================================ # Analysis 2: Base Matrix Comparison # ============================================================================ def compute_girth(H_base, z): """ Compute girth (shortest cycle) of the Tanner graph. Uses BFS from each VN to find shortest cycle. """ m_base = H_base.shape[0] n_base = H_base.shape[1] n = n_base * z m = m_base * z # Build adjacency: VN -> list of CN neighbors, CN -> list of VN neighbors vn_to_cn = [[] for _ in range(n)] cn_to_vn = [[] for _ in range(m)] for r in range(m_base): for c in range(n_base): shift = int(H_base[r, c]) if shift < 0: continue for zz in range(z): vn = c * z + (zz + shift) % z cn = r * z + zz vn_to_cn[vn].append(cn) cn_to_vn[cn].append(vn) min_cycle = float('inf') # BFS from a subset of VNs (all VNs for small codes) sample_vns = range(min(n, 64)) # sample for speed for start_vn in sample_vns: # BFS: track (node_type, node_id) with distances # node_type: 0=VN, 1=CN dist = {} parent = {} queue = [(0, start_vn, -1, -1)] # (distance, node, parent_type, parent_id) dist[(0, start_vn)] = 0 parent[(0, start_vn)] = None qi = 0 while qi < len(queue): d, node, pt, pid = queue[qi] qi += 1 node_type = 0 if (0, node) in dist and dist[(0, node)] == d else 1 # Determine actual type if qi == 1: node_type = 0 # start is VN # Get neighbors if (0, node) in dist and dist[(0, node)] == d: # This is a VN, go to CN neighbors for cn in vn_to_cn[node]: if (1, cn) == (pt, pid): continue # don't go back if (1, cn) in dist: cycle_len = d + 1 + dist[(1, cn)] min_cycle = min(min_cycle, cycle_len) else: dist[(1, cn)] = d + 1 queue.append((d + 1, cn, 0, node)) elif (1, node) in dist and dist[(1, node)] == d: # This is a CN, go to VN neighbors for vn in cn_to_vn[node]: if (0, vn) == (pt, pid): continue if (0, vn) in dist: cycle_len = d + 1 + dist[(0, vn)] min_cycle = min(min_cycle, cycle_len) else: dist[(0, vn)] = d + 1 queue.append((d + 1, vn, 1, node)) if d > min_cycle: break return min_cycle if min_cycle < float('inf') else -1 def build_improved_staircase(z=32): """ Improved IRA staircase with better degree distribution. Adds extra connections to low-degree columns while keeping lower-triangular parity structure for easy encoding. Key improvement: col 7 gets extra connection (dv=1 -> dv=2). """ m_base = 7 n_base = 8 H_base = np.full((m_base, n_base), -1, dtype=np.int16) # Info column (col 0): connect to all rows (same as original) H_base[0, 0] = 0 H_base[1, 0] = 11 H_base[2, 0] = 17 H_base[3, 0] = 23 H_base[4, 0] = 29 H_base[5, 0] = 3 H_base[6, 0] = 9 # Staircase parity (same diagonal structure) H_base[0, 1] = 5 H_base[1, 1] = 3; H_base[1, 2] = 0 H_base[2, 2] = 7; H_base[2, 3] = 0 H_base[3, 3] = 13; H_base[3, 4] = 0 H_base[4, 4] = 19; H_base[4, 5] = 0 H_base[5, 5] = 25; H_base[5, 6] = 0 H_base[6, 6] = 31; H_base[6, 7] = 0 # Improvements: add connections to boost weak columns # Col 7 (dv=1 -> dv=2): connect to row 0 H_base[0, 7] = 15 # Col 1 currently dv=2, boost to dv=3: connect to row 3 H_base[3, 1] = 21 # Expand n = n_base * z m = m_base * z H_full = np.zeros((m, n), dtype=np.int8) for r in range(m_base): for c in range(n_base): shift = int(H_base[r, c]) if shift < 0: continue for zz in range(z): col_idx = c * z + (zz + shift) % z H_full[r * z + zz, col_idx] = 1 return H_base, H_full def build_peg_matrix(z=32): """ PEG-constructed base matrix for rate 1/8. Progressive Edge Growth: greedily add edges to maximize local girth. Target degree distribution: VN degrees [3,2,2,2,2,2,2,2] average ~2.1. More uniform than staircase. Note: this loses the trivial staircase encoding property. Encoding requires solving H*c=0 via back-substitution on the parity submatrix, which must still be full-rank. """ m_base = 7 n_base = 8 H_base = np.full((m_base, n_base), -1, dtype=np.int16) # PEG-like construction: spread edges to maximize girth # Col 0 (info): connect to all 7 rows (dv=7, high protection) H_base[0, 0] = 0 H_base[1, 0] = 11 H_base[2, 0] = 17 H_base[3, 0] = 23 H_base[4, 0] = 29 H_base[5, 0] = 3 H_base[6, 0] = 9 # Parity columns: each connected to exactly 2 rows (dv=2) # with shifts chosen for maximum girth spread # Ensure parity sub-matrix is invertible for encoding H_base[0, 1] = 5; H_base[1, 1] = 3 H_base[1, 2] = 0; H_base[2, 2] = 7 H_base[2, 3] = 0; H_base[3, 3] = 13 H_base[3, 4] = 0; H_base[4, 4] = 19 H_base[4, 5] = 0; H_base[5, 5] = 25 H_base[5, 6] = 0; H_base[6, 6] = 31 H_base[6, 7] = 0; H_base[0, 7] = 15 # This is actually a ring structure (not strictly lower-triangular) # Encoding: need to solve via the ring structure # Expand n = n_base * z m = m_base * z H_full = np.zeros((m, n), dtype=np.int8) for r in range(m_base): for c in range(n_base): shift = int(H_base[r, c]) if shift < 0: continue for zz in range(z): col_idx = c * z + (zz + shift) % z H_full[r * z + zz, col_idx] = 1 return H_base, H_full def peg_encode(info, H_base, H_full, z=32): """ Encode for PEG matrix using Gaussian elimination on parity submatrix. For non-staircase matrices, we solve H_parity * p = H_info * info (mod 2) using the full H matrix. """ m_base = H_base.shape[0] n_base = H_base.shape[1] k = z m = m_base * z n = n_base * z # H = [H_info | H_parity], solve H_parity * p = H_info * info mod 2 H_info = H_full[:, :k] H_parity = H_full[:, k:] rhs = H_info @ info % 2 # Solve via GF(2) Gaussian elimination # Augmented matrix [H_parity | rhs] aug = np.hstack([H_parity.copy(), rhs.reshape(-1, 1)]).astype(np.int8) rows, cols_p = H_parity.shape pivot_row = 0 pivot_cols = [] for col in range(cols_p): # Find pivot found = -1 for r in range(pivot_row, rows): if aug[r, col] == 1: found = r break if found == -1: continue # Swap aug[[pivot_row, found]] = aug[[found, pivot_row]] pivot_cols.append(col) # Eliminate for r in range(rows): if r != pivot_row and aug[r, col] == 1: aug[r] = (aug[r] + aug[pivot_row]) % 2 pivot_row += 1 # Back-substitute parity = np.zeros(cols_p, dtype=np.int8) for i, col in enumerate(pivot_cols): parity[col] = aug[i, -1] codeword = np.concatenate([info, parity]) check = H_full @ codeword % 2 assert np.all(check == 0), f"PEG encoding failed: syn={check.sum()}" return codeword def run_matrix_comparison(lam_b=0.1, n_frames=500, max_iter=30): """Compare base matrix designs at rate 1/8.""" from ldpc_sim import H_BASE as ORIG_H_BASE, build_full_h_matrix, ldpc_encode matrices = { 'Original staircase': (ORIG_H_BASE, build_full_h_matrix(), ldpc_encode), } # Improved staircase imp_base, imp_full = build_improved_staircase() # Need a custom encoder since it's not pure staircase anymore # Actually it IS still lower-triangular in parity, but with extra # connections — we can use the PEG encoder (GF2 solve) for safety matrices['Improved staircase'] = (imp_base, imp_full, None) # PEG peg_base, peg_full = build_peg_matrix() matrices['PEG ring'] = (peg_base, peg_full, None) z = 32 lam_s_values = [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0] print("=== Base Matrix Comparison (Rate 1/8) ===") print(f"Frames per point: {n_frames}, lam_b={lam_b}") print() # Degree distributions and girth for name, (H_base, H_full, _) in matrices.items(): m_base_m = H_base.shape[0] n_base_m = H_base.shape[1] vn_degrees = [] for c in range(n_base_m): dv = sum(1 for r in range(m_base_m) if H_base[r, c] >= 0) vn_degrees.append(dv) cn_degrees = [] for r in range(m_base_m): dc = sum(1 for c in range(n_base_m) if H_base[r, c] >= 0) cn_degrees.append(dc) girth = compute_girth(H_base, z) print(f"{name}:") print(f" VN degrees: {vn_degrees}") print(f" CN degrees: {cn_degrees}") print(f" Girth: {girth}") rank = np.linalg.matrix_rank(H_full.astype(float)) print(f" H rank: {rank} / {H_base.shape[0]*z}") print() # FER sweep header = f"{'lam_s':>8s}" for name in matrices: short = name[:12] header += f" {short:>14s}" print(header) print("-" * (8 + 15 * len(matrices))) all_results = [] for lam_s in lam_s_values: row = f"{lam_s:8.1f}" for name, (H_base, H_full, enc_fn) in matrices.items(): np.random.seed(42) k = z n = H_base.shape[1] * z frame_errors = 0 for _ in range(n_frames): info = np.random.randint(0, 2, k).astype(np.int8) if enc_fn is not None: codeword = enc_fn(info, H_full) else: codeword = peg_encode(info, H_base, H_full, z) llr, _ = poisson_channel(codeword, lam_s, lam_b) llr_q = quantize_llr(llr) decoded, converged, _, _ = generic_decode( llr_q, H_base, z, max_iter ) if np.any(decoded != info): frame_errors += 1 fer = frame_errors / n_frames row += f" {fer:14.3f}" all_results.append({ 'matrix': name, 'lam_s': lam_s, 'fer': fer }) print(row) return all_results # ============================================================================ # Analysis 3: Quantization Sweep # ============================================================================ def run_quant_sweep(lam_s=None, lam_b=0.1, n_frames=500, max_iter=30): """Sweep quantization bits and measure FER impact.""" from ldpc_sim import H_BASE, build_full_h_matrix, ldpc_encode H_base = H_BASE H_full = build_full_h_matrix() z = 32 quant_bits = [4, 5, 6, 8, 10, 16] # If no lambda_s given, test at a few points if lam_s is None: lam_s_values = [2.0, 3.0, 5.0] else: lam_s_values = [lam_s] print("=== Quantization Sweep ===") print(f"Frames per point: {n_frames}, lam_b={lam_b}") print() header = f"{'lam_s':>8s}" for q in quant_bits: header += f" {q:>6d}-bit" header += f" {'float':>10s}" print(header) print("-" * (8 + 10 * (len(quant_bits) + 1))) all_results = [] for ls in lam_s_values: row = f"{ls:8.1f}" # Fixed quantization levels for q in quant_bits: np.random.seed(42) frame_errors = 0 for _ in range(n_frames): info = np.random.randint(0, 2, z).astype(np.int8) codeword = ldpc_encode(info, H_full) llr, _ = poisson_channel(codeword, ls, lam_b) q_max = 2**(q-1) - 1 q_min = -(2**(q-1)) scale = q_max / 5.0 llr_q = np.clip(np.round(llr * scale), q_min, q_max).astype(int) decoded, _, _, _ = generic_decode( llr_q, H_base, z, max_iter, q_bits=q ) if np.any(decoded != info): frame_errors += 1 fer = frame_errors / n_frames row += f" {fer:9.3f}" all_results.append({ 'lam_s': ls, 'q_bits': q, 'fer': fer }) # Float (no quantization — use very wide integers to approximate) np.random.seed(42) frame_errors = 0 for _ in range(n_frames): info = np.random.randint(0, 2, z).astype(np.int8) codeword = ldpc_encode(info, H_full) llr, _ = poisson_channel(codeword, ls, lam_b) # Use 16-bit with high scale for "float-like" precision scale = 1000.0 llr_q = np.clip(np.round(llr * scale), -32768, 32767).astype(int) decoded, _, _, _ = generic_decode( llr_q, H_base, z, max_iter, q_bits=16 ) if np.any(decoded != info): frame_errors += 1 fer = frame_errors / n_frames row += f" {fer:9.3f}" all_results.append({ 'lam_s': ls, 'q_bits': 'float', 'fer': fer }) print(row) return all_results # ============================================================================ # Analysis 4: Shannon Gap # ============================================================================ def poisson_channel_capacity(lam_s, lam_b, max_y=50): """ Compute binary-input Poisson channel capacity. C = max_p I(X;Y) where X in {0,1}, Y ~ Poisson(x*lam_s + lam_b) For OOK with equal priors (p=0.5): I(X;Y) = H(Y) - 0.5*H(Y|X=0) - 0.5*H(Y|X=1) """ from scipy.stats import poisson def entropy_poisson(lam, max_y): """Entropy of Poisson(lam) truncated to [0, max_y].""" if lam < 1e-10: return 0.0 pmf = [poisson.pmf(y, lam) for y in range(max_y + 1)] total = sum(pmf) h = 0.0 for p in pmf: if p > 0: h -= p * np.log2(p / total) return h # H(Y|X=0) and H(Y|X=1) h_y_x0 = entropy_poisson(lam_b, max_y) h_y_x1 = entropy_poisson(lam_s + lam_b, max_y) # Optimize over input probability p best_c = 0.0 for p1_pct in range(0, 101): p1 = p1_pct / 100.0 p0 = 1.0 - p1 # H(Y) where Y has mixture distribution pmf_mix = [] for y in range(max_y + 1): pmf_mix.append(p0 * poisson.pmf(y, lam_b) + p1 * poisson.pmf(y, lam_s + lam_b)) h_y = 0.0 for p in pmf_mix: if p > 0: h_y -= p * np.log2(p) c = h_y - p0 * h_y_x0 - p1 * h_y_x1 best_c = max(best_c, c) return best_c def run_shannon_gap(lam_b=0.1): """Compute Shannon limits and compare to decoder thresholds.""" print("=== Shannon Gap Analysis ===") print(f"Binary-input Poisson channel, lam_b={lam_b}") print() rates = [ (1, "1/2", 0.5), (2, "1/3", 1/3), (3, "1/4", 0.25), (5, "1/6", 1/6), (7, "1/8", 0.125), ] print(f"{'Rate':>8s} {'Shannon':>10s} {'Capacity':>10s}") print(f"{'':>8s} {'lam_s':>10s} {'at limit':>10s}") print("-" * 32) results = [] for m_base, name, rate_val in rates: # Binary search for minimum lam_s where C >= rate lo, hi = 0.01, 50.0 for _ in range(50): # bisection mid = (lo + hi) / 2.0 c = poisson_channel_capacity(mid, lam_b) if c >= rate_val: hi = mid else: lo = mid shannon_lam_s = (lo + hi) / 2.0 cap_at_limit = poisson_channel_capacity(shannon_lam_s, lam_b) print(f"{name:>8s} {shannon_lam_s:10.3f} {cap_at_limit:10.4f}") results.append({ 'rate': name, 'rate_val': rate_val, 'shannon_lam_s': shannon_lam_s, 'capacity_at_limit': cap_at_limit, }) print() print("Shannon limit = minimum lam_s (photons/slot) where") print("channel capacity >= code rate. Decoder must operate above this.") print() print("Compare decoder thresholds (from --rate-sweep) to these limits") print("to compute the gap in dB: gap = 10*log10(decoder_threshold / shannon_limit)") return results # ============================================================================ # Main # ============================================================================ def main(): parser = argparse.ArgumentParser(description='LDPC Code Analysis') parser.add_argument('--rate-sweep', action='store_true', help='Compare code rates vs lambda_s') parser.add_argument('--matrix-compare', action='store_true', help='Compare base matrix designs') parser.add_argument('--quant-sweep', action='store_true', help='Quantization bits impact') parser.add_argument('--shannon-gap', action='store_true', help='Distance from channel capacity') parser.add_argument('--all', action='store_true', help='Run all analyses') parser.add_argument('--n-frames', type=int, default=500, help='Frames per data point (default: 500)') parser.add_argument('--lam-b', type=float, default=0.1, help='Background photons/slot (default: 0.1)') parser.add_argument('--seed', type=int, default=42) args = parser.parse_args() np.random.seed(args.seed) results = {} if args.rate_sweep or args.all: results['rate_sweep'] = run_rate_sweep(args.lam_b, args.n_frames) print() if args.matrix_compare or args.all: results['matrix_compare'] = run_matrix_comparison( args.lam_b, args.n_frames ) print() if args.quant_sweep or args.all: results['quant_sweep'] = run_quant_sweep( lam_b=args.lam_b, n_frames=args.n_frames ) print() if args.shannon_gap or args.all: results['shannon_gap'] = run_shannon_gap(args.lam_b) print() if results: out_path = os.path.join(os.path.dirname(__file__) or '.', '..', 'data', 'analysis_results.json') os.makedirs(os.path.dirname(out_path), exist_ok=True) with open(out_path, 'w') as f: json.dump(results, f, indent=2, default=str) print(f"Results saved to {out_path}") else: parser.print_help() if __name__ == '__main__': main() ``` **Step 4: Run tests** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc_analysis.py -v` Expected: All PASS **Step 5: Commit** ```bash git add model/ldpc_analysis.py model/test_ldpc_analysis.py git commit -m "feat: add code analysis tool with rate sweep and matrix comparison" ``` --- ### Task 4: Run All Analyses and Capture Results **Step 1: Run rate sweep** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --rate-sweep --n-frames 200` Expected: Table showing FER vs lambda_s for each rate. Rate 1/8 should have the lowest threshold. **Step 2: Run matrix comparison** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --matrix-compare --n-frames 200` Expected: Table comparing original vs improved vs PEG matrices. **Step 3: Run quantization sweep** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --quant-sweep --n-frames 200` Expected: FER vs quantization bits table. **Step 4: Run Shannon gap** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --shannon-gap` Expected: Shannon limit lambda_s for each rate. **Step 5: Run frame sync sweep** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --sweep --n-trials 20` Expected: Acquisition success rate vs lambda_s. **Step 6: Run frame sync re-sync test** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --resync-test --lam-s 5.0 --n-trials 20` Expected: Re-sync success at various slip amounts. **Step 7: Review results, fix any bugs, commit data** ```bash git add data/ git commit -m "data: add analysis and frame sync simulation results" ``` --- ### Task 5: Run Full Test Suite **Step 1: Run all tests** Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/ -v` Expected: All tests PASS **Step 2: Final commit** ```bash git add -A git commit -m "test: all model tests passing for frame sync and code analysis" ```