56 KiB
Frame Sync & Code Analysis Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Build a frame synchronization prototype and four quantitative code analyses to validate design decisions for the LDPC optical decoder (target: 1-2 photons/slot).
Architecture: Two new scripts (frame_sync.py, ldpc_analysis.py) import encoder/decoder/channel from existing ldpc_sim.py. Frame sync simulates a continuous stream with unknown offset. Analyses compare rates, matrices, quantization, and Shannon limits. All output summary tables to stdout and save results to data/.
Tech Stack: Python 3, numpy, scipy (for Poisson PMF and optimization in Shannon gap). pytest for validation tests.
Key reference files:
model/ldpc_sim.py— encoder, decoder, channel, syndrome check (DO NOT MODIFY)docs/plans/2026-02-23-frame-sync-and-code-analysis-design.md— design document
Task 1: Test Harness — Validate Existing Model
Files:
- Create:
model/test_ldpc.py
Step 1: Write validation tests for existing ldpc_sim.py
These tests establish a known-good baseline before we build on top.
"""Tests for ldpc_sim.py — validates encoder, decoder, channel, syndrome."""
import numpy as np
import pytest
from ldpc_sim import (
build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr,
decode_layered_min_sum, compute_syndrome_weight, cyclic_shift,
sat_add_q, sat_sub_q, min_sum_cn_update,
N, K, M, Z, N_BASE, M_BASE, H_BASE, Q_MAX, Q_MIN,
)
class TestCodeParameters:
def test_dimensions(self):
assert N == 256
assert K == 32
assert M == 224
assert Z == 32
assert N_BASE == 8
assert M_BASE == 7
def test_h_base_shape(self):
assert H_BASE.shape == (M_BASE, N_BASE)
def test_h_base_info_column_fully_connected(self):
"""Column 0 (info) should connect to all 7 rows."""
for r in range(M_BASE):
assert H_BASE[r, 0] >= 0
def test_h_base_staircase_structure(self):
"""Parity columns should form lower-triangular staircase."""
for r in range(M_BASE):
# Diagonal parity column (r+1) should be connected
assert H_BASE[r, r + 1] >= 0
# Above diagonal should be -1 (not connected)
for c in range(r + 2, N_BASE):
assert H_BASE[r, c] == -1
class TestHMatrix:
def test_full_h_dimensions(self):
H = build_full_h_matrix()
assert H.shape == (M, N)
def test_full_h_binary(self):
H = build_full_h_matrix()
assert set(np.unique(H)).issubset({0, 1})
def test_full_h_row_weights(self):
"""Each row in a Z-block should have the same weight (QC property)."""
H = build_full_h_matrix()
for r in range(M_BASE):
weights = [H[r * Z + z].sum() for z in range(Z)]
assert len(set(weights)) == 1, f"Row block {r} has inconsistent weights"
def test_full_h_rank(self):
"""H should have rank M (full rank) for the code to work."""
H = build_full_h_matrix()
rank = np.linalg.matrix_rank(H.astype(float))
assert rank == M
class TestEncoder:
def test_encode_all_zeros(self):
H = build_full_h_matrix()
info = np.zeros(K, dtype=np.int8)
codeword = ldpc_encode(info, H)
assert len(codeword) == N
assert np.all(codeword == 0) # all-zero is always a codeword
def test_encode_satisfies_parity(self):
H = build_full_h_matrix()
np.random.seed(123)
info = np.random.randint(0, 2, K).astype(np.int8)
codeword = ldpc_encode(info, H)
syndrome = H @ codeword % 2
assert np.all(syndrome == 0)
def test_encode_preserves_info_bits(self):
H = build_full_h_matrix()
np.random.seed(456)
info = np.random.randint(0, 2, K).astype(np.int8)
codeword = ldpc_encode(info, H)
assert np.array_equal(codeword[:K], info)
def test_encode_multiple_random(self):
"""Encode 20 random messages, all must satisfy parity."""
H = build_full_h_matrix()
for seed in range(20):
np.random.seed(seed)
info = np.random.randint(0, 2, K).astype(np.int8)
codeword = ldpc_encode(info, H)
syndrome = H @ codeword % 2
assert np.all(syndrome == 0), f"Failed for seed {seed}"
class TestSaturatingArithmetic:
def test_sat_add_normal(self):
assert sat_add_q(10, 5) == 15
def test_sat_add_positive_overflow(self):
assert sat_add_q(Q_MAX, 1) == Q_MAX
def test_sat_add_negative_overflow(self):
assert sat_add_q(Q_MIN, -1) == Q_MIN
def test_sat_sub(self):
assert sat_sub_q(10, 3) == 7
assert sat_sub_q(Q_MIN, 1) == Q_MIN
class TestMinSumCN:
def test_all_positive(self):
"""All positive inputs: output signs should all be positive."""
msgs = [5, 10, 15]
out = min_sum_cn_update(msgs, offset=1)
assert all(o >= 0 for o in out)
def test_one_negative(self):
"""One negative: that output positive, others negative."""
msgs = [-5, 10, 15]
out = min_sum_cn_update(msgs, offset=1)
assert out[0] >= 0 # extrinsic sign excludes own sign
assert out[1] <= 0
assert out[2] <= 0
def test_magnitude_is_min_of_others(self):
"""Output magnitude = min of OTHER magnitudes - offset."""
msgs = [3, 10, 20]
out = min_sum_cn_update(msgs, offset=1)
# out[0]: min of {10,20} - 1 = 9
assert abs(out[0]) == 9
# out[1]: min of {3,20} - 1 = 2
assert abs(out[1]) == 2
# out[2]: min of {3,10} - 1 = 2
assert abs(out[2]) == 2
class TestDecoder:
def test_decode_noiseless(self):
"""Noiseless channel (huge lambda_s) should decode perfectly."""
H = build_full_h_matrix()
np.random.seed(789)
info = np.random.randint(0, 2, K).astype(np.int8)
codeword = ldpc_encode(info, H)
# Noiseless: LLR is just +/- large value
llr = np.where(codeword == 0, 20.0, -20.0)
llr_q = quantize_llr(llr)
decoded, converged, iters, syn_wt = decode_layered_min_sum(llr_q, max_iter=5)
assert converged
assert np.array_equal(decoded, info)
assert iters == 1 # should converge in 1 iteration with perfect input
def test_decode_high_snr(self):
"""High SNR (lambda_s=10) should decode reliably."""
H = build_full_h_matrix()
np.random.seed(101)
info = np.random.randint(0, 2, K).astype(np.int8)
codeword = ldpc_encode(info, H)
llr, _ = poisson_channel(codeword, lam_s=10.0, lam_b=0.1)
llr_q = quantize_llr(llr)
decoded, converged, _, _ = decode_layered_min_sum(llr_q)
assert converged
assert np.array_equal(decoded, info)
class TestSyndrome:
def test_valid_codeword_syndrome_zero(self):
H = build_full_h_matrix()
np.random.seed(202)
info = np.random.randint(0, 2, K).astype(np.int8)
codeword = ldpc_encode(info, H)
assert compute_syndrome_weight(codeword.tolist()) == 0
def test_flipped_bit_nonzero_syndrome(self):
H = build_full_h_matrix()
info = np.zeros(K, dtype=np.int8)
codeword = ldpc_encode(info, H)
corrupted = codeword.copy()
corrupted[0] ^= 1 # flip one bit
assert compute_syndrome_weight(corrupted.tolist()) > 0
Step 2: Run tests to verify they pass
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc.py -v
Expected: All tests PASS (these test existing working code)
Step 3: Commit
git add model/test_ldpc.py
git commit -m "test: add validation tests for existing LDPC model"
Task 2: Frame Sync — Streaming Infrastructure
Files:
- Create:
model/frame_sync.py - Create:
model/test_frame_sync.py
Step 1: Write the test for stream generation and syndrome screening
"""Tests for frame synchronization."""
import numpy as np
import pytest
from ldpc_sim import (
build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr,
compute_syndrome_weight, N, K, Z,
)
from frame_sync import generate_stream, syndrome_screen
class TestStreamGeneration:
def test_stream_length(self):
"""Stream should be n_frames * N + offset samples long."""
H = build_full_h_matrix()
np.random.seed(42)
stream_llr, true_offset, info_list = generate_stream(
H, n_frames=5, lam_s=5.0, lam_b=0.1, offset=10
)
assert len(stream_llr) == 5 * N + 10
assert true_offset == 10
assert len(info_list) == 5
def test_stream_zero_offset(self):
H = build_full_h_matrix()
np.random.seed(42)
stream_llr, true_offset, _ = generate_stream(
H, n_frames=3, lam_s=5.0, lam_b=0.1, offset=0
)
assert true_offset == 0
assert len(stream_llr) == 3 * N
def test_stream_random_offset(self):
H = build_full_h_matrix()
np.random.seed(42)
stream_llr, true_offset, _ = generate_stream(
H, n_frames=5, lam_s=5.0, lam_b=0.1, offset=None
)
assert 0 <= true_offset < N
assert len(stream_llr) == 5 * N + true_offset
class TestSyndromeScreen:
def test_correct_offset_low_syndrome(self):
"""At high SNR, correct offset should have very low syndrome weight."""
H = build_full_h_matrix()
np.random.seed(42)
stream_llr, true_offset, _ = generate_stream(
H, n_frames=5, lam_s=10.0, lam_b=0.1, offset=37
)
scores = syndrome_screen(stream_llr, n_offsets=N)
# Correct offset should have the lowest (or near-lowest) syndrome
best_offset = min(scores, key=scores.get)
assert best_offset == 37
def test_wrong_offsets_high_syndrome(self):
"""Wrong offsets should have syndrome weight near M/2 ~ 112."""
H = build_full_h_matrix()
np.random.seed(42)
stream_llr, true_offset, _ = generate_stream(
H, n_frames=5, lam_s=10.0, lam_b=0.1, offset=0
)
scores = syndrome_screen(stream_llr, n_offsets=N)
wrong_scores = [v for k, v in scores.items() if k != 0]
avg_wrong = np.mean(wrong_scores)
assert avg_wrong > 80 # should be near 112 for random
Step 2: Run tests to verify they fail
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_frame_sync.py -v
Expected: FAIL — frame_sync module doesn't exist yet
Step 3: Implement generate_stream and syndrome_screen
#!/usr/bin/env python3
"""
Frame Synchronization Prototype for LDPC Optical Decoder.
Simulates a continuous photon-counting stream with unknown frame alignment.
Tests acquisition (finding codeword boundaries) and re-sync after loss.
Usage:
python3 frame_sync.py # Quick demo at a few lambda_s
python3 frame_sync.py --sweep # Acquisition success vs lambda_s
python3 frame_sync.py --resync-test # Re-sync after offset slip
"""
import numpy as np
import argparse
import json
import os
from ldpc_sim import (
build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr,
decode_layered_min_sum, compute_syndrome_weight,
N, K, M, Z, N_BASE, M_BASE, H_BASE,
)
SCREENING_THRESHOLD = 50 # syndrome weight below this triggers full decode
def generate_stream(H, n_frames=10, lam_s=2.0, lam_b=0.1, offset=None):
"""
Generate a continuous stream of channel LLRs with unknown frame offset.
Args:
H: full parity check matrix
n_frames: number of codewords to concatenate
lam_s: signal photons/slot
lam_b: background photons/slot
offset: bit offset to insert (None = random 0..N-1)
Returns:
stream_llr: continuous LLR stream (float, not quantized)
true_offset: the actual offset used
info_list: list of info bit arrays for each frame (ground truth)
"""
if offset is None:
offset = np.random.randint(0, N)
# Generate random prefix LLRs for the offset (noise-only, no codeword)
# These represent receiving channel output before the first codeword starts
prefix_llr = np.random.normal(0, 1, offset) # just noise
# Generate codewords and their channel outputs
info_list = []
codeword_llrs = []
for _ in range(n_frames):
info = np.random.randint(0, 2, K).astype(np.int8)
codeword = ldpc_encode(info, H)
llr, _ = poisson_channel(codeword, lam_s, lam_b)
info_list.append(info)
codeword_llrs.append(llr)
stream_llr = np.concatenate([prefix_llr] + codeword_llrs)
return stream_llr, offset, info_list
def syndrome_screen(stream_llr, n_offsets=None):
"""
Screen all candidate offsets using hard-decision syndrome weight.
Args:
stream_llr: continuous LLR stream
n_offsets: number of offsets to try (default N=256)
Returns:
scores: dict mapping offset -> syndrome weight
"""
if n_offsets is None:
n_offsets = N
scores = {}
for off in range(n_offsets):
if off + N > len(stream_llr):
break
window = stream_llr[off:off + N]
hard = [0 if v > 0 else 1 for v in window]
syn_wt = compute_syndrome_weight(hard)
scores[off] = syn_wt
return scores
def acquire_sync(stream_llr, max_confirm=2, max_iter=30, verbose=False):
"""
Acquire frame synchronization from a continuous stream.
Algorithm:
1. Syndrome-screen all 256 offsets
2. For offsets below threshold, run full iterative decode
3. Confirm by decoding next max_confirm consecutive frames
Args:
stream_llr: continuous LLR stream
max_confirm: number of consecutive frames to confirm sync
max_iter: decoder max iterations
verbose: print screening details
Returns:
dict with keys:
locked: bool
offset: detected offset (or -1)
offsets_screened: number of offsets tried
full_decodes: number of full iterative decodes run
screening_cost: equivalent decode cycles for syndrome screening
"""
scores = syndrome_screen(stream_llr)
# Sort by syndrome weight (best candidates first)
ranked = sorted(scores.items(), key=lambda x: x[1])
full_decodes = 0
for off, syn_wt in ranked:
if syn_wt >= SCREENING_THRESHOLD:
break # all remaining offsets are worse
# Full iterative decode at this offset
window = stream_llr[off:off + N]
llr_q = quantize_llr(window)
decoded, converged, iters, _ = decode_layered_min_sum(llr_q, max_iter)
full_decodes += 1
if not converged:
continue
# Confirm with consecutive frames
confirmed = True
for f in range(1, max_confirm + 1):
start = off + f * N
if start + N > len(stream_llr):
confirmed = False
break
win_f = stream_llr[start:start + N]
llr_q_f = quantize_llr(win_f)
_, conv_f, _, _ = decode_layered_min_sum(llr_q_f, max_iter)
full_decodes += 1
if not conv_f:
confirmed = False
break
if confirmed:
# Cost model: syndrome screening ~ 1/30 of a full decode each
screening_cost = len(scores) / 30.0
return {
'locked': True,
'offset': off,
'offsets_screened': len(scores),
'full_decodes': full_decodes,
'screening_cost': screening_cost,
'total_equiv_decodes': screening_cost + full_decodes,
}
return {
'locked': False,
'offset': -1,
'offsets_screened': len(scores),
'full_decodes': full_decodes,
'screening_cost': len(scores) / 30.0,
'total_equiv_decodes': len(scores) / 30.0 + full_decodes,
}
def resync_test(stream_llr, true_offset, slip_amount, search_radius=16,
max_iter=30):
"""
Test re-synchronization after an offset slip.
Simulates the receiver losing sync by `slip_amount` bits,
then searching nearby offsets to re-acquire.
Returns:
dict with locked, detected offset, cost
"""
slipped_offset = (true_offset + slip_amount) % N
# First try nearby offsets (fast re-sync)
for delta in range(-search_radius, search_radius + 1):
off = (slipped_offset + delta) % N
if off + N > len(stream_llr):
continue
window = stream_llr[off:off + N]
hard = [0 if v > 0 else 1 for v in window]
syn_wt = compute_syndrome_weight(hard)
if syn_wt < SCREENING_THRESHOLD:
llr_q = quantize_llr(window)
_, converged, _, _ = decode_layered_min_sum(llr_q, max_iter)
if converged:
return {
'locked': True,
'offset': off,
'slip': slip_amount,
'search_radius_used': search_radius,
'needed_full_search': False,
}
# Fall back to full search
result = acquire_sync(stream_llr)
result['slip'] = slip_amount
result['search_radius_used'] = search_radius
result['needed_full_search'] = True
return result
def run_acquisition_sweep(lam_s_values, lam_b=0.1, n_trials=50, n_frames=10):
"""Sweep lambda_s and measure acquisition success rate."""
H = build_full_h_matrix()
print(f"{'lam_s':>8s} {'success%':>10s} {'avg_decodes':>12s} "
f"{'avg_cost':>10s} {'false_lock':>10s}")
print("-" * 55)
results = []
for lam_s in lam_s_values:
successes = 0
false_locks = 0
total_decodes = 0
total_cost = 0
for trial in range(n_trials):
np.random.seed(trial * 1000 + int(lam_s * 100))
stream, true_off, _ = generate_stream(
H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None
)
result = acquire_sync(stream)
if result['locked']:
if result['offset'] == true_off:
successes += 1
else:
false_locks += 1
total_decodes += result['full_decodes']
total_cost += result['total_equiv_decodes']
pct = 100 * successes / n_trials
avg_dec = total_decodes / n_trials
avg_cost = total_cost / n_trials
fl_pct = 100 * false_locks / n_trials
print(f"{lam_s:8.1f} {pct:9.1f}% {avg_dec:12.1f} "
f"{avg_cost:10.1f} {fl_pct:9.1f}%")
results.append({
'lam_s': lam_s, 'success_pct': pct,
'avg_full_decodes': avg_dec, 'avg_equiv_cost': avg_cost,
'false_lock_pct': fl_pct,
})
return results
def run_resync_sweep(lam_s=5.0, lam_b=0.1, n_trials=20, n_frames=10):
"""Test re-sync at various slip amounts."""
H = build_full_h_matrix()
print(f"\nRe-sync test at lam_s={lam_s}")
print(f"{'slip':>6s} {'success%':>10s} {'full_search%':>12s}")
print("-" * 32)
for slip in [1, 2, 4, 8, 16, 32, 64, 128]:
successes = 0
full_searches = 0
for trial in range(n_trials):
np.random.seed(trial * 100 + slip)
stream, true_off, _ = generate_stream(
H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None
)
result = resync_test(stream, true_off, slip)
if result['locked']:
successes += 1
if result.get('needed_full_search'):
full_searches += 1
print(f"{slip:6d} {100*successes/n_trials:9.1f}% "
f"{100*full_searches/n_trials:11.1f}%")
def main():
parser = argparse.ArgumentParser(description='LDPC Frame Sync Prototype')
parser.add_argument('--sweep', action='store_true',
help='Acquisition success rate vs lambda_s')
parser.add_argument('--resync-test', action='store_true',
help='Re-sync after offset slip')
parser.add_argument('--lam-s', type=float, default=5.0,
help='Signal photons/slot (default: 5.0)')
parser.add_argument('--lam-b', type=float, default=0.1,
help='Background photons/slot (default: 0.1)')
parser.add_argument('--n-trials', type=int, default=50,
help='Trials per data point (default: 50)')
parser.add_argument('--seed', type=int, default=42)
args = parser.parse_args()
if args.sweep:
print("=== Frame Sync Acquisition Sweep ===")
lam_s_values = [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
results = run_acquisition_sweep(lam_s_values, args.lam_b, args.n_trials)
out_path = os.path.join(os.path.dirname(__file__), '..', 'data',
'sync_acquisition_results.json')
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(out_path, 'w') as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {out_path}")
elif args.resync_test:
run_resync_sweep(args.lam_s, args.lam_b, args.n_trials)
else:
# Quick demo
print("=== Frame Sync Demo ===")
H = build_full_h_matrix()
np.random.seed(args.seed)
stream, true_off, infos = generate_stream(
H, n_frames=10, lam_s=args.lam_s, lam_b=args.lam_b, offset=None
)
print(f"Stream: {len(stream)} samples, true offset: {true_off}")
print(f"Signal: lam_s={args.lam_s}, lam_b={args.lam_b}")
result = acquire_sync(stream, verbose=True)
if result['locked']:
match = "CORRECT" if result['offset'] == true_off else "WRONG"
print(f"Locked at offset {result['offset']} ({match})")
print(f" Screened {result['offsets_screened']} offsets")
print(f" Full decodes: {result['full_decodes']}")
print(f" Total cost: {result['total_equiv_decodes']:.1f} "
f"equiv decodes")
else:
print("FAILED to acquire sync")
if __name__ == '__main__':
main()
Step 4: Run tests to verify they pass
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_frame_sync.py -v
Expected: All PASS
Step 5: Run the demo to verify it works end-to-end
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --lam-s 5.0
Expected: Should lock at the correct offset
Step 6: Commit
git add model/frame_sync.py model/test_frame_sync.py
git commit -m "feat: add frame synchronization prototype with tests"
Task 3: Code Analysis — Rate Comparison
Files:
- Create:
model/ldpc_analysis.py - Create:
model/test_ldpc_analysis.py
Step 1: Write the test for rate sweep
"""Tests for ldpc_analysis.py."""
import numpy as np
import pytest
from ldpc_analysis import build_ira_staircase, rate_sweep_single
class TestIRAStaircase:
def test_rate_1_2(self):
"""Rate 1/2: 1x2 base matrix, n=64, k=32."""
H_base, H_full = build_ira_staircase(m_base=1, z=32)
assert H_base.shape == (1, 2)
assert H_full.shape == (32, 64)
assert np.linalg.matrix_rank(H_full.astype(float)) == 32
def test_rate_1_4(self):
"""Rate 1/4: 3x4 base matrix, n=128, k=32."""
H_base, H_full = build_ira_staircase(m_base=3, z=32)
assert H_base.shape == (3, 4)
assert H_full.shape == (96, 128)
assert np.linalg.matrix_rank(H_full.astype(float)) == 96
def test_rate_1_8_matches_existing(self):
"""Rate 1/8 staircase should produce a valid code."""
H_base, H_full = build_ira_staircase(m_base=7, z=32)
assert H_base.shape == (7, 8)
assert H_full.shape == (224, 256)
def test_encode_decode_roundtrip(self):
"""Any generated staircase code should encode/decode at high SNR."""
for m_base in [1, 2, 3, 5]:
H_base, H_full = build_ira_staircase(m_base=m_base, z=32)
n_base = m_base + 1
k = 32
n = n_base * 32
np.random.seed(42)
info = np.random.randint(0, 2, k).astype(np.int8)
# Use generic IRA encoder
from ldpc_analysis import ira_encode
codeword = ira_encode(info, H_base, H_full, z=32)
syndrome = H_full @ codeword % 2
assert np.all(syndrome == 0), f"Failed for m_base={m_base}"
class TestRateSweep:
def test_high_snr_all_rates_decode(self):
"""At very high SNR, all rates should achieve FER=0."""
for m_base in [1, 3, 7]:
result = rate_sweep_single(m_base=m_base, z=32, lam_s=20.0,
lam_b=0.1, n_frames=10)
assert result['fer'] == 0.0, f"m_base={m_base} failed at lam_s=20"
Step 2: Run tests to verify they fail
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc_analysis.py::TestIRAStaircase -v
Expected: FAIL — module doesn't exist
Step 3: Implement build_ira_staircase, ira_encode, and rate_sweep_single
The key functions:
build_ira_staircase(m_base, z): Generate an IRA staircase base matrix and expand to full H. Info column connected to all rows, lower-triangular parity with spread shift values.ira_encode(info, H_base, H_full, z): Generic IRA encoder that works for any staircase code (sequential parity solve, same algorithm as ldpc_sim.py).rate_sweep_single(m_base, z, lam_s, lam_b, n_frames): Run n_frames through encode/channel/decode, return BER/FER.run_rate_sweep(...): Full sweep over rates and lambda_s values, print summary table.
#!/usr/bin/env python3
"""
LDPC Code Analysis Tool — Quantitative design validation.
Four analyses:
--rate-sweep: Compare code rates (1/2 through 1/8) vs lambda_s
--matrix-compare: Compare base matrix designs at rate 1/8
--quant-sweep: Quantization bits (4-32) impact on FER
--shannon-gap: Distance from Poisson channel capacity
Usage:
python3 ldpc_analysis.py --rate-sweep
python3 ldpc_analysis.py --matrix-compare
python3 ldpc_analysis.py --quant-sweep
python3 ldpc_analysis.py --shannon-gap
python3 ldpc_analysis.py --all
"""
import numpy as np
import argparse
import json
import os
from ldpc_sim import (
poisson_channel, quantize_llr, decode_layered_min_sum,
compute_syndrome_weight, sat_add_q, sat_sub_q, min_sum_cn_update,
Q_BITS, Q_MAX, Q_MIN, OFFSET,
)
# ============================================================================
# Generic IRA staircase code construction
# ============================================================================
def build_ira_staircase(m_base, z=32):
"""
Build an IRA (Irregular Repeat-Accumulate) staircase QC-LDPC code.
Structure:
- Column 0: info bits, connected to ALL m_base rows (high protection)
- Columns 1..m_base: parity bits, lower-triangular staircase
- Each row has 2-3 nonzero entries (sparse)
- Shift values spread across [0, z-1] to maximize girth
Args:
m_base: number of base matrix rows (rate = 1 / (m_base + 1))
z: lifting factor
Returns:
H_base: (m_base, m_base+1) array of shift values (-1 = not connected)
H_full: (m_base*z, (m_base+1)*z) full binary parity check matrix
"""
n_base = m_base + 1
H_base = np.full((m_base, n_base), -1, dtype=np.int16)
# Info column (col 0): connect to all rows with spread shifts
for r in range(m_base):
H_base[r, 0] = (r * z // m_base) % z
# Staircase parity: row r connects to col r+1 (diagonal)
# Row 0: col 1 only (besides info)
# Row r>0: col r and col r+1
H_base[0, 1] = (5 * z // m_base) % z if m_base > 1 else 0
for r in range(1, m_base):
# Sub-diagonal: connect to previous parity column
H_base[r, r] = (r * 7) % z
# Diagonal: connect to own parity column
H_base[r, r + 1] = 0
# Expand to full binary matrix
n = n_base * z
m = m_base * z
H_full = np.zeros((m, n), dtype=np.int8)
for r in range(m_base):
for c in range(n_base):
shift = int(H_base[r, c])
if shift < 0:
continue
for zz in range(z):
col_idx = c * z + (zz + shift) % z
H_full[r * z + zz, col_idx] = 1
return H_base, H_full
def ira_encode(info, H_base, H_full, z=32):
"""
Encode using IRA staircase structure (generic, any rate).
Same algorithm as ldpc_sim.ldpc_encode but works for any staircase.
"""
m_base = H_base.shape[0]
n_base = H_base.shape[1]
k = z
assert len(info) == k
def apply_p(x, s):
return np.roll(x, -int(s))
def inv_p(y, s):
return np.roll(y, int(s))
p = [np.zeros(z, dtype=np.int8) for _ in range(n_base)]
# Row 0: solve for p[1]
accum = apply_p(info, H_base[0, 0])
p[1] = inv_p(accum, H_base[0, 1])
# Rows 1..m_base-1: solve for p[2]..p[m_base]
for r in range(1, m_base):
accum = apply_p(info, H_base[r, 0])
accum = (accum + apply_p(p[r], H_base[r, r])) % 2
p[r + 1] = inv_p(accum, H_base[r, r + 1])
codeword = np.concatenate([info] + [p[c] for c in range(1, n_base)])
# Verify
check = H_full @ codeword % 2
assert np.all(check == 0), f"Encoding failed: syndrome weight = {check.sum()}"
return codeword
def generic_decode(llr_q, H_base, z=32, max_iter=30, early_term=True,
q_bits=Q_BITS):
"""
Generic layered min-sum decoder for any IRA staircase code.
Same algorithm as ldpc_sim.decode_layered_min_sum but parameterized.
"""
m_base = H_base.shape[0]
n_base = H_base.shape[1]
n = n_base * z
k = z
q_max = 2**(q_bits-1) - 1
q_min = -(2**(q_bits-1))
def sat_add(a, b):
s = int(a) + int(b)
return max(q_min, min(q_max, s))
def sat_sub(a, b):
return sat_add(a, -b)
beliefs = [int(x) for x in llr_q]
msg = [[[0]*z for _ in range(n_base)] for _ in range(m_base)]
for iteration in range(max_iter):
for row in range(m_base):
connected = [c for c in range(n_base) if H_base[row, c] >= 0]
dc = len(connected)
vn_to_cn = [[0]*z for _ in range(dc)]
for ci, col in enumerate(connected):
shift = int(H_base[row, col])
for zz in range(z):
sz = (zz + shift) % z
bi = col * z + sz
vn_to_cn[ci][zz] = sat_sub(beliefs[bi], msg[row][col][zz])
cn_to_vn = [[0]*z for _ in range(dc)]
for zz in range(z):
cn_in = [vn_to_cn[ci][zz] for ci in range(dc)]
cn_out = min_sum_cn_update(cn_in)
for ci in range(dc):
cn_to_vn[ci][zz] = cn_out[ci]
for ci, col in enumerate(connected):
shift = int(H_base[row, col])
for zz in range(z):
sz = (zz + shift) % z
bi = col * z + sz
beliefs[bi] = sat_add(vn_to_cn[ci][zz], cn_to_vn[ci][zz])
msg[row][col][zz] = cn_to_vn[ci][zz]
# Syndrome check
hard = [1 if b < 0 else 0 for b in beliefs]
syn_wt = 0
for r in range(m_base):
for zz in range(z):
parity = 0
for c in range(n_base):
shift = int(H_base[r, c])
if shift < 0:
continue
sz = (zz + shift) % z
bi = c * z + sz
parity ^= hard[bi]
if parity:
syn_wt += 1
if early_term and syn_wt == 0:
return np.array(hard[:k]), True, iteration + 1, 0
hard = [1 if b < 0 else 0 for b in beliefs]
return np.array(hard[:k]), syn_wt == 0, max_iter, syn_wt
def rate_sweep_single(m_base, z=32, lam_s=2.0, lam_b=0.1, n_frames=100,
max_iter=30, q_bits=Q_BITS):
"""
Run FER simulation for a single code rate and lambda_s.
Returns dict with ber, fer, avg_iter.
"""
H_base, H_full = build_ira_staircase(m_base, z)
k = z
bit_errors = 0
frame_errors = 0
total_bits = 0
total_iter = 0
for _ in range(n_frames):
info = np.random.randint(0, 2, k).astype(np.int8)
codeword = ira_encode(info, H_base, H_full, z)
llr, _ = poisson_channel(codeword, lam_s, lam_b)
# Quantize with configurable bits
q_max = 2**(q_bits-1) - 1
q_min = -(2**(q_bits-1))
scale = q_max / 5.0
llr_q = np.clip(np.round(llr * scale), q_min, q_max).astype(int)
decoded, converged, iters, _ = generic_decode(
llr_q, H_base, z, max_iter, q_bits=q_bits
)
total_iter += iters
errs = np.sum(decoded != info)
bit_errors += errs
total_bits += k
if errs > 0:
frame_errors += 1
return {
'rate': f"1/{m_base+1}",
'rate_val': 1.0 / (m_base + 1),
'm_base': m_base,
'lam_s': lam_s,
'ber': bit_errors / total_bits if total_bits > 0 else 0,
'fer': frame_errors / n_frames,
'avg_iter': total_iter / n_frames,
}
# ============================================================================
# Analysis 1: Rate Comparison
# ============================================================================
def run_rate_sweep(lam_b=0.1, n_frames=500, max_iter=30):
"""Compare code rates across lambda_s range."""
rates = [
(1, "1/2"),
(2, "1/3"),
(3, "1/4"),
(5, "1/6"),
(7, "1/8"),
]
lam_s_values = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 7.0, 10.0]
print("=== Rate Comparison: FER vs lambda_s ===")
print(f"Frames per point: {n_frames}, lam_b={lam_b}, max_iter={max_iter}")
print()
# Header
header = f"{'lam_s':>8s}"
for _, name in rates:
header += f" {name:>8s}"
print(header)
print("-" * (8 + 9 * len(rates)))
all_results = []
for lam_s in lam_s_values:
row = f"{lam_s:8.1f}"
for m_base, name in rates:
np.random.seed(42)
r = rate_sweep_single(m_base, lam_s=lam_s, lam_b=lam_b,
n_frames=n_frames, max_iter=max_iter)
row += f" {r['fer']:8.3f}"
all_results.append(r)
print(row)
# Summary: threshold lambda_s for each rate (FER < 10%)
print("\n--- Threshold lambda_s (FER < 10%) ---")
for m_base, name in rates:
rate_results = [r for r in all_results if r['m_base'] == m_base]
threshold = None
for r in sorted(rate_results, key=lambda x: x['lam_s']):
if r['fer'] < 0.10:
threshold = r['lam_s']
break
t_str = f"{threshold:.1f}" if threshold else ">10"
print(f" Rate {name}: threshold lam_s = {t_str} photons/slot")
return all_results
# ============================================================================
# Analysis 2: Base Matrix Comparison
# ============================================================================
def compute_girth(H_base, z):
"""
Compute girth (shortest cycle) of the Tanner graph.
Uses BFS from each VN to find shortest cycle.
"""
m_base = H_base.shape[0]
n_base = H_base.shape[1]
n = n_base * z
m = m_base * z
# Build adjacency: VN -> list of CN neighbors, CN -> list of VN neighbors
vn_to_cn = [[] for _ in range(n)]
cn_to_vn = [[] for _ in range(m)]
for r in range(m_base):
for c in range(n_base):
shift = int(H_base[r, c])
if shift < 0:
continue
for zz in range(z):
vn = c * z + (zz + shift) % z
cn = r * z + zz
vn_to_cn[vn].append(cn)
cn_to_vn[cn].append(vn)
min_cycle = float('inf')
# BFS from a subset of VNs (all VNs for small codes)
sample_vns = range(min(n, 64)) # sample for speed
for start_vn in sample_vns:
# BFS: track (node_type, node_id) with distances
# node_type: 0=VN, 1=CN
dist = {}
parent = {}
queue = [(0, start_vn, -1, -1)] # (distance, node, parent_type, parent_id)
dist[(0, start_vn)] = 0
parent[(0, start_vn)] = None
qi = 0
while qi < len(queue):
d, node, pt, pid = queue[qi]
qi += 1
node_type = 0 if (0, node) in dist and dist[(0, node)] == d else 1
# Determine actual type
if qi == 1:
node_type = 0 # start is VN
# Get neighbors
if (0, node) in dist and dist[(0, node)] == d:
# This is a VN, go to CN neighbors
for cn in vn_to_cn[node]:
if (1, cn) == (pt, pid):
continue # don't go back
if (1, cn) in dist:
cycle_len = d + 1 + dist[(1, cn)]
min_cycle = min(min_cycle, cycle_len)
else:
dist[(1, cn)] = d + 1
queue.append((d + 1, cn, 0, node))
elif (1, node) in dist and dist[(1, node)] == d:
# This is a CN, go to VN neighbors
for vn in cn_to_vn[node]:
if (0, vn) == (pt, pid):
continue
if (0, vn) in dist:
cycle_len = d + 1 + dist[(0, vn)]
min_cycle = min(min_cycle, cycle_len)
else:
dist[(0, vn)] = d + 1
queue.append((d + 1, vn, 1, node))
if d > min_cycle:
break
return min_cycle if min_cycle < float('inf') else -1
def build_improved_staircase(z=32):
"""
Improved IRA staircase with better degree distribution.
Adds extra connections to low-degree columns while keeping
lower-triangular parity structure for easy encoding.
Key improvement: col 7 gets extra connection (dv=1 -> dv=2).
"""
m_base = 7
n_base = 8
H_base = np.full((m_base, n_base), -1, dtype=np.int16)
# Info column (col 0): connect to all rows (same as original)
H_base[0, 0] = 0
H_base[1, 0] = 11
H_base[2, 0] = 17
H_base[3, 0] = 23
H_base[4, 0] = 29
H_base[5, 0] = 3
H_base[6, 0] = 9
# Staircase parity (same diagonal structure)
H_base[0, 1] = 5
H_base[1, 1] = 3; H_base[1, 2] = 0
H_base[2, 2] = 7; H_base[2, 3] = 0
H_base[3, 3] = 13; H_base[3, 4] = 0
H_base[4, 4] = 19; H_base[4, 5] = 0
H_base[5, 5] = 25; H_base[5, 6] = 0
H_base[6, 6] = 31; H_base[6, 7] = 0
# Improvements: add connections to boost weak columns
# Col 7 (dv=1 -> dv=2): connect to row 0
H_base[0, 7] = 15
# Col 1 currently dv=2, boost to dv=3: connect to row 3
H_base[3, 1] = 21
# Expand
n = n_base * z
m = m_base * z
H_full = np.zeros((m, n), dtype=np.int8)
for r in range(m_base):
for c in range(n_base):
shift = int(H_base[r, c])
if shift < 0:
continue
for zz in range(z):
col_idx = c * z + (zz + shift) % z
H_full[r * z + zz, col_idx] = 1
return H_base, H_full
def build_peg_matrix(z=32):
"""
PEG-constructed base matrix for rate 1/8.
Progressive Edge Growth: greedily add edges to maximize local girth.
Target degree distribution: VN degrees [3,2,2,2,2,2,2,2] average ~2.1.
More uniform than staircase.
Note: this loses the trivial staircase encoding property.
Encoding requires solving H*c=0 via back-substitution on the
parity submatrix, which must still be full-rank.
"""
m_base = 7
n_base = 8
H_base = np.full((m_base, n_base), -1, dtype=np.int16)
# PEG-like construction: spread edges to maximize girth
# Col 0 (info): connect to all 7 rows (dv=7, high protection)
H_base[0, 0] = 0
H_base[1, 0] = 11
H_base[2, 0] = 17
H_base[3, 0] = 23
H_base[4, 0] = 29
H_base[5, 0] = 3
H_base[6, 0] = 9
# Parity columns: each connected to exactly 2 rows (dv=2)
# with shifts chosen for maximum girth spread
# Ensure parity sub-matrix is invertible for encoding
H_base[0, 1] = 5; H_base[1, 1] = 3
H_base[1, 2] = 0; H_base[2, 2] = 7
H_base[2, 3] = 0; H_base[3, 3] = 13
H_base[3, 4] = 0; H_base[4, 4] = 19
H_base[4, 5] = 0; H_base[5, 5] = 25
H_base[5, 6] = 0; H_base[6, 6] = 31
H_base[6, 7] = 0; H_base[0, 7] = 15
# This is actually a ring structure (not strictly lower-triangular)
# Encoding: need to solve via the ring structure
# Expand
n = n_base * z
m = m_base * z
H_full = np.zeros((m, n), dtype=np.int8)
for r in range(m_base):
for c in range(n_base):
shift = int(H_base[r, c])
if shift < 0:
continue
for zz in range(z):
col_idx = c * z + (zz + shift) % z
H_full[r * z + zz, col_idx] = 1
return H_base, H_full
def peg_encode(info, H_base, H_full, z=32):
"""
Encode for PEG matrix using Gaussian elimination on parity submatrix.
For non-staircase matrices, we solve H_parity * p = H_info * info (mod 2)
using the full H matrix.
"""
m_base = H_base.shape[0]
n_base = H_base.shape[1]
k = z
m = m_base * z
n = n_base * z
# H = [H_info | H_parity], solve H_parity * p = H_info * info mod 2
H_info = H_full[:, :k]
H_parity = H_full[:, k:]
rhs = H_info @ info % 2
# Solve via GF(2) Gaussian elimination
# Augmented matrix [H_parity | rhs]
aug = np.hstack([H_parity.copy(), rhs.reshape(-1, 1)]).astype(np.int8)
rows, cols_p = H_parity.shape
pivot_row = 0
pivot_cols = []
for col in range(cols_p):
# Find pivot
found = -1
for r in range(pivot_row, rows):
if aug[r, col] == 1:
found = r
break
if found == -1:
continue
# Swap
aug[[pivot_row, found]] = aug[[found, pivot_row]]
pivot_cols.append(col)
# Eliminate
for r in range(rows):
if r != pivot_row and aug[r, col] == 1:
aug[r] = (aug[r] + aug[pivot_row]) % 2
pivot_row += 1
# Back-substitute
parity = np.zeros(cols_p, dtype=np.int8)
for i, col in enumerate(pivot_cols):
parity[col] = aug[i, -1]
codeword = np.concatenate([info, parity])
check = H_full @ codeword % 2
assert np.all(check == 0), f"PEG encoding failed: syn={check.sum()}"
return codeword
def run_matrix_comparison(lam_b=0.1, n_frames=500, max_iter=30):
"""Compare base matrix designs at rate 1/8."""
from ldpc_sim import H_BASE as ORIG_H_BASE, build_full_h_matrix, ldpc_encode
matrices = {
'Original staircase': (ORIG_H_BASE, build_full_h_matrix(), ldpc_encode),
}
# Improved staircase
imp_base, imp_full = build_improved_staircase()
# Need a custom encoder since it's not pure staircase anymore
# Actually it IS still lower-triangular in parity, but with extra
# connections — we can use the PEG encoder (GF2 solve) for safety
matrices['Improved staircase'] = (imp_base, imp_full, None)
# PEG
peg_base, peg_full = build_peg_matrix()
matrices['PEG ring'] = (peg_base, peg_full, None)
z = 32
lam_s_values = [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
print("=== Base Matrix Comparison (Rate 1/8) ===")
print(f"Frames per point: {n_frames}, lam_b={lam_b}")
print()
# Degree distributions and girth
for name, (H_base, H_full, _) in matrices.items():
m_base_m = H_base.shape[0]
n_base_m = H_base.shape[1]
vn_degrees = []
for c in range(n_base_m):
dv = sum(1 for r in range(m_base_m) if H_base[r, c] >= 0)
vn_degrees.append(dv)
cn_degrees = []
for r in range(m_base_m):
dc = sum(1 for c in range(n_base_m) if H_base[r, c] >= 0)
cn_degrees.append(dc)
girth = compute_girth(H_base, z)
print(f"{name}:")
print(f" VN degrees: {vn_degrees}")
print(f" CN degrees: {cn_degrees}")
print(f" Girth: {girth}")
rank = np.linalg.matrix_rank(H_full.astype(float))
print(f" H rank: {rank} / {H_base.shape[0]*z}")
print()
# FER sweep
header = f"{'lam_s':>8s}"
for name in matrices:
short = name[:12]
header += f" {short:>14s}"
print(header)
print("-" * (8 + 15 * len(matrices)))
all_results = []
for lam_s in lam_s_values:
row = f"{lam_s:8.1f}"
for name, (H_base, H_full, enc_fn) in matrices.items():
np.random.seed(42)
k = z
n = H_base.shape[1] * z
frame_errors = 0
for _ in range(n_frames):
info = np.random.randint(0, 2, k).astype(np.int8)
if enc_fn is not None:
codeword = enc_fn(info, H_full)
else:
codeword = peg_encode(info, H_base, H_full, z)
llr, _ = poisson_channel(codeword, lam_s, lam_b)
llr_q = quantize_llr(llr)
decoded, converged, _, _ = generic_decode(
llr_q, H_base, z, max_iter
)
if np.any(decoded != info):
frame_errors += 1
fer = frame_errors / n_frames
row += f" {fer:14.3f}"
all_results.append({
'matrix': name, 'lam_s': lam_s, 'fer': fer
})
print(row)
return all_results
# ============================================================================
# Analysis 3: Quantization Sweep
# ============================================================================
def run_quant_sweep(lam_s=None, lam_b=0.1, n_frames=500, max_iter=30):
"""Sweep quantization bits and measure FER impact."""
from ldpc_sim import H_BASE, build_full_h_matrix, ldpc_encode
H_base = H_BASE
H_full = build_full_h_matrix()
z = 32
quant_bits = [4, 5, 6, 8, 10, 16]
# If no lambda_s given, test at a few points
if lam_s is None:
lam_s_values = [2.0, 3.0, 5.0]
else:
lam_s_values = [lam_s]
print("=== Quantization Sweep ===")
print(f"Frames per point: {n_frames}, lam_b={lam_b}")
print()
header = f"{'lam_s':>8s}"
for q in quant_bits:
header += f" {q:>6d}-bit"
header += f" {'float':>10s}"
print(header)
print("-" * (8 + 10 * (len(quant_bits) + 1)))
all_results = []
for ls in lam_s_values:
row = f"{ls:8.1f}"
# Fixed quantization levels
for q in quant_bits:
np.random.seed(42)
frame_errors = 0
for _ in range(n_frames):
info = np.random.randint(0, 2, z).astype(np.int8)
codeword = ldpc_encode(info, H_full)
llr, _ = poisson_channel(codeword, ls, lam_b)
q_max = 2**(q-1) - 1
q_min = -(2**(q-1))
scale = q_max / 5.0
llr_q = np.clip(np.round(llr * scale), q_min, q_max).astype(int)
decoded, _, _, _ = generic_decode(
llr_q, H_base, z, max_iter, q_bits=q
)
if np.any(decoded != info):
frame_errors += 1
fer = frame_errors / n_frames
row += f" {fer:9.3f}"
all_results.append({
'lam_s': ls, 'q_bits': q, 'fer': fer
})
# Float (no quantization — use very wide integers to approximate)
np.random.seed(42)
frame_errors = 0
for _ in range(n_frames):
info = np.random.randint(0, 2, z).astype(np.int8)
codeword = ldpc_encode(info, H_full)
llr, _ = poisson_channel(codeword, ls, lam_b)
# Use 16-bit with high scale for "float-like" precision
scale = 1000.0
llr_q = np.clip(np.round(llr * scale), -32768, 32767).astype(int)
decoded, _, _, _ = generic_decode(
llr_q, H_base, z, max_iter, q_bits=16
)
if np.any(decoded != info):
frame_errors += 1
fer = frame_errors / n_frames
row += f" {fer:9.3f}"
all_results.append({
'lam_s': ls, 'q_bits': 'float', 'fer': fer
})
print(row)
return all_results
# ============================================================================
# Analysis 4: Shannon Gap
# ============================================================================
def poisson_channel_capacity(lam_s, lam_b, max_y=50):
"""
Compute binary-input Poisson channel capacity.
C = max_p I(X;Y) where X in {0,1}, Y ~ Poisson(x*lam_s + lam_b)
For OOK with equal priors (p=0.5):
I(X;Y) = H(Y) - 0.5*H(Y|X=0) - 0.5*H(Y|X=1)
"""
from scipy.stats import poisson
def entropy_poisson(lam, max_y):
"""Entropy of Poisson(lam) truncated to [0, max_y]."""
if lam < 1e-10:
return 0.0
pmf = [poisson.pmf(y, lam) for y in range(max_y + 1)]
total = sum(pmf)
h = 0.0
for p in pmf:
if p > 0:
h -= p * np.log2(p / total)
return h
# H(Y|X=0) and H(Y|X=1)
h_y_x0 = entropy_poisson(lam_b, max_y)
h_y_x1 = entropy_poisson(lam_s + lam_b, max_y)
# Optimize over input probability p
best_c = 0.0
for p1_pct in range(0, 101):
p1 = p1_pct / 100.0
p0 = 1.0 - p1
# H(Y) where Y has mixture distribution
pmf_mix = []
for y in range(max_y + 1):
pmf_mix.append(p0 * poisson.pmf(y, lam_b) +
p1 * poisson.pmf(y, lam_s + lam_b))
h_y = 0.0
for p in pmf_mix:
if p > 0:
h_y -= p * np.log2(p)
c = h_y - p0 * h_y_x0 - p1 * h_y_x1
best_c = max(best_c, c)
return best_c
def run_shannon_gap(lam_b=0.1):
"""Compute Shannon limits and compare to decoder thresholds."""
print("=== Shannon Gap Analysis ===")
print(f"Binary-input Poisson channel, lam_b={lam_b}")
print()
rates = [
(1, "1/2", 0.5),
(2, "1/3", 1/3),
(3, "1/4", 0.25),
(5, "1/6", 1/6),
(7, "1/8", 0.125),
]
print(f"{'Rate':>8s} {'Shannon':>10s} {'Capacity':>10s}")
print(f"{'':>8s} {'lam_s':>10s} {'at limit':>10s}")
print("-" * 32)
results = []
for m_base, name, rate_val in rates:
# Binary search for minimum lam_s where C >= rate
lo, hi = 0.01, 50.0
for _ in range(50): # bisection
mid = (lo + hi) / 2.0
c = poisson_channel_capacity(mid, lam_b)
if c >= rate_val:
hi = mid
else:
lo = mid
shannon_lam_s = (lo + hi) / 2.0
cap_at_limit = poisson_channel_capacity(shannon_lam_s, lam_b)
print(f"{name:>8s} {shannon_lam_s:10.3f} {cap_at_limit:10.4f}")
results.append({
'rate': name, 'rate_val': rate_val,
'shannon_lam_s': shannon_lam_s,
'capacity_at_limit': cap_at_limit,
})
print()
print("Shannon limit = minimum lam_s (photons/slot) where")
print("channel capacity >= code rate. Decoder must operate above this.")
print()
print("Compare decoder thresholds (from --rate-sweep) to these limits")
print("to compute the gap in dB: gap = 10*log10(decoder_threshold / shannon_limit)")
return results
# ============================================================================
# Main
# ============================================================================
def main():
parser = argparse.ArgumentParser(description='LDPC Code Analysis')
parser.add_argument('--rate-sweep', action='store_true',
help='Compare code rates vs lambda_s')
parser.add_argument('--matrix-compare', action='store_true',
help='Compare base matrix designs')
parser.add_argument('--quant-sweep', action='store_true',
help='Quantization bits impact')
parser.add_argument('--shannon-gap', action='store_true',
help='Distance from channel capacity')
parser.add_argument('--all', action='store_true',
help='Run all analyses')
parser.add_argument('--n-frames', type=int, default=500,
help='Frames per data point (default: 500)')
parser.add_argument('--lam-b', type=float, default=0.1,
help='Background photons/slot (default: 0.1)')
parser.add_argument('--seed', type=int, default=42)
args = parser.parse_args()
np.random.seed(args.seed)
results = {}
if args.rate_sweep or args.all:
results['rate_sweep'] = run_rate_sweep(args.lam_b, args.n_frames)
print()
if args.matrix_compare or args.all:
results['matrix_compare'] = run_matrix_comparison(
args.lam_b, args.n_frames
)
print()
if args.quant_sweep or args.all:
results['quant_sweep'] = run_quant_sweep(
lam_b=args.lam_b, n_frames=args.n_frames
)
print()
if args.shannon_gap or args.all:
results['shannon_gap'] = run_shannon_gap(args.lam_b)
print()
if results:
out_path = os.path.join(os.path.dirname(__file__) or '.', '..',
'data', 'analysis_results.json')
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(out_path, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"Results saved to {out_path}")
else:
parser.print_help()
if __name__ == '__main__':
main()
Step 4: Run tests
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc_analysis.py -v
Expected: All PASS
Step 5: Commit
git add model/ldpc_analysis.py model/test_ldpc_analysis.py
git commit -m "feat: add code analysis tool with rate sweep and matrix comparison"
Task 4: Run All Analyses and Capture Results
Step 1: Run rate sweep
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --rate-sweep --n-frames 200
Expected: Table showing FER vs lambda_s for each rate. Rate 1/8 should have the lowest threshold.
Step 2: Run matrix comparison
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --matrix-compare --n-frames 200
Expected: Table comparing original vs improved vs PEG matrices.
Step 3: Run quantization sweep
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --quant-sweep --n-frames 200
Expected: FER vs quantization bits table.
Step 4: Run Shannon gap
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --shannon-gap
Expected: Shannon limit lambda_s for each rate.
Step 5: Run frame sync sweep
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --sweep --n-trials 20
Expected: Acquisition success rate vs lambda_s.
Step 6: Run frame sync re-sync test
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --resync-test --lam-s 5.0 --n-trials 20
Expected: Re-sync success at various slip amounts.
Step 7: Review results, fix any bugs, commit data
git add data/
git commit -m "data: add analysis and frame sync simulation results"
Task 5: Run Full Test Suite
Step 1: Run all tests
Run: cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/ -v
Expected: All tests PASS
Step 2: Final commit
git add -A
git commit -m "test: all model tests passing for frame sync and code analysis"