1681 lines
56 KiB
Markdown
1681 lines
56 KiB
Markdown
# Frame Sync & Code Analysis Implementation Plan
|
|
|
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
|
|
|
**Goal:** Build a frame synchronization prototype and four quantitative code analyses to validate design decisions for the LDPC optical decoder (target: 1-2 photons/slot).
|
|
|
|
**Architecture:** Two new scripts (`frame_sync.py`, `ldpc_analysis.py`) import encoder/decoder/channel from existing `ldpc_sim.py`. Frame sync simulates a continuous stream with unknown offset. Analyses compare rates, matrices, quantization, and Shannon limits. All output summary tables to stdout and save results to `data/`.
|
|
|
|
**Tech Stack:** Python 3, numpy, scipy (for Poisson PMF and optimization in Shannon gap). pytest for validation tests.
|
|
|
|
**Key reference files:**
|
|
- `model/ldpc_sim.py` — encoder, decoder, channel, syndrome check (DO NOT MODIFY)
|
|
- `docs/plans/2026-02-23-frame-sync-and-code-analysis-design.md` — design document
|
|
|
|
---
|
|
|
|
### Task 1: Test Harness — Validate Existing Model
|
|
|
|
**Files:**
|
|
- Create: `model/test_ldpc.py`
|
|
|
|
**Step 1: Write validation tests for existing ldpc_sim.py**
|
|
|
|
These tests establish a known-good baseline before we build on top.
|
|
|
|
```python
|
|
"""Tests for ldpc_sim.py — validates encoder, decoder, channel, syndrome."""
|
|
import numpy as np
|
|
import pytest
|
|
from ldpc_sim import (
|
|
build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr,
|
|
decode_layered_min_sum, compute_syndrome_weight, cyclic_shift,
|
|
sat_add_q, sat_sub_q, min_sum_cn_update,
|
|
N, K, M, Z, N_BASE, M_BASE, H_BASE, Q_MAX, Q_MIN,
|
|
)
|
|
|
|
|
|
class TestCodeParameters:
|
|
def test_dimensions(self):
|
|
assert N == 256
|
|
assert K == 32
|
|
assert M == 224
|
|
assert Z == 32
|
|
assert N_BASE == 8
|
|
assert M_BASE == 7
|
|
|
|
def test_h_base_shape(self):
|
|
assert H_BASE.shape == (M_BASE, N_BASE)
|
|
|
|
def test_h_base_info_column_fully_connected(self):
|
|
"""Column 0 (info) should connect to all 7 rows."""
|
|
for r in range(M_BASE):
|
|
assert H_BASE[r, 0] >= 0
|
|
|
|
def test_h_base_staircase_structure(self):
|
|
"""Parity columns should form lower-triangular staircase."""
|
|
for r in range(M_BASE):
|
|
# Diagonal parity column (r+1) should be connected
|
|
assert H_BASE[r, r + 1] >= 0
|
|
# Above diagonal should be -1 (not connected)
|
|
for c in range(r + 2, N_BASE):
|
|
assert H_BASE[r, c] == -1
|
|
|
|
|
|
class TestHMatrix:
|
|
def test_full_h_dimensions(self):
|
|
H = build_full_h_matrix()
|
|
assert H.shape == (M, N)
|
|
|
|
def test_full_h_binary(self):
|
|
H = build_full_h_matrix()
|
|
assert set(np.unique(H)).issubset({0, 1})
|
|
|
|
def test_full_h_row_weights(self):
|
|
"""Each row in a Z-block should have the same weight (QC property)."""
|
|
H = build_full_h_matrix()
|
|
for r in range(M_BASE):
|
|
weights = [H[r * Z + z].sum() for z in range(Z)]
|
|
assert len(set(weights)) == 1, f"Row block {r} has inconsistent weights"
|
|
|
|
def test_full_h_rank(self):
|
|
"""H should have rank M (full rank) for the code to work."""
|
|
H = build_full_h_matrix()
|
|
rank = np.linalg.matrix_rank(H.astype(float))
|
|
assert rank == M
|
|
|
|
|
|
class TestEncoder:
|
|
def test_encode_all_zeros(self):
|
|
H = build_full_h_matrix()
|
|
info = np.zeros(K, dtype=np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
assert len(codeword) == N
|
|
assert np.all(codeword == 0) # all-zero is always a codeword
|
|
|
|
def test_encode_satisfies_parity(self):
|
|
H = build_full_h_matrix()
|
|
np.random.seed(123)
|
|
info = np.random.randint(0, 2, K).astype(np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
syndrome = H @ codeword % 2
|
|
assert np.all(syndrome == 0)
|
|
|
|
def test_encode_preserves_info_bits(self):
|
|
H = build_full_h_matrix()
|
|
np.random.seed(456)
|
|
info = np.random.randint(0, 2, K).astype(np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
assert np.array_equal(codeword[:K], info)
|
|
|
|
def test_encode_multiple_random(self):
|
|
"""Encode 20 random messages, all must satisfy parity."""
|
|
H = build_full_h_matrix()
|
|
for seed in range(20):
|
|
np.random.seed(seed)
|
|
info = np.random.randint(0, 2, K).astype(np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
syndrome = H @ codeword % 2
|
|
assert np.all(syndrome == 0), f"Failed for seed {seed}"
|
|
|
|
|
|
class TestSaturatingArithmetic:
|
|
def test_sat_add_normal(self):
|
|
assert sat_add_q(10, 5) == 15
|
|
|
|
def test_sat_add_positive_overflow(self):
|
|
assert sat_add_q(Q_MAX, 1) == Q_MAX
|
|
|
|
def test_sat_add_negative_overflow(self):
|
|
assert sat_add_q(Q_MIN, -1) == Q_MIN
|
|
|
|
def test_sat_sub(self):
|
|
assert sat_sub_q(10, 3) == 7
|
|
assert sat_sub_q(Q_MIN, 1) == Q_MIN
|
|
|
|
|
|
class TestMinSumCN:
|
|
def test_all_positive(self):
|
|
"""All positive inputs: output signs should all be positive."""
|
|
msgs = [5, 10, 15]
|
|
out = min_sum_cn_update(msgs, offset=1)
|
|
assert all(o >= 0 for o in out)
|
|
|
|
def test_one_negative(self):
|
|
"""One negative: that output positive, others negative."""
|
|
msgs = [-5, 10, 15]
|
|
out = min_sum_cn_update(msgs, offset=1)
|
|
assert out[0] >= 0 # extrinsic sign excludes own sign
|
|
assert out[1] <= 0
|
|
assert out[2] <= 0
|
|
|
|
def test_magnitude_is_min_of_others(self):
|
|
"""Output magnitude = min of OTHER magnitudes - offset."""
|
|
msgs = [3, 10, 20]
|
|
out = min_sum_cn_update(msgs, offset=1)
|
|
# out[0]: min of {10,20} - 1 = 9
|
|
assert abs(out[0]) == 9
|
|
# out[1]: min of {3,20} - 1 = 2
|
|
assert abs(out[1]) == 2
|
|
# out[2]: min of {3,10} - 1 = 2
|
|
assert abs(out[2]) == 2
|
|
|
|
|
|
class TestDecoder:
|
|
def test_decode_noiseless(self):
|
|
"""Noiseless channel (huge lambda_s) should decode perfectly."""
|
|
H = build_full_h_matrix()
|
|
np.random.seed(789)
|
|
info = np.random.randint(0, 2, K).astype(np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
# Noiseless: LLR is just +/- large value
|
|
llr = np.where(codeword == 0, 20.0, -20.0)
|
|
llr_q = quantize_llr(llr)
|
|
decoded, converged, iters, syn_wt = decode_layered_min_sum(llr_q, max_iter=5)
|
|
assert converged
|
|
assert np.array_equal(decoded, info)
|
|
assert iters == 1 # should converge in 1 iteration with perfect input
|
|
|
|
def test_decode_high_snr(self):
|
|
"""High SNR (lambda_s=10) should decode reliably."""
|
|
H = build_full_h_matrix()
|
|
np.random.seed(101)
|
|
info = np.random.randint(0, 2, K).astype(np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
llr, _ = poisson_channel(codeword, lam_s=10.0, lam_b=0.1)
|
|
llr_q = quantize_llr(llr)
|
|
decoded, converged, _, _ = decode_layered_min_sum(llr_q)
|
|
assert converged
|
|
assert np.array_equal(decoded, info)
|
|
|
|
|
|
class TestSyndrome:
|
|
def test_valid_codeword_syndrome_zero(self):
|
|
H = build_full_h_matrix()
|
|
np.random.seed(202)
|
|
info = np.random.randint(0, 2, K).astype(np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
assert compute_syndrome_weight(codeword.tolist()) == 0
|
|
|
|
def test_flipped_bit_nonzero_syndrome(self):
|
|
H = build_full_h_matrix()
|
|
info = np.zeros(K, dtype=np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
corrupted = codeword.copy()
|
|
corrupted[0] ^= 1 # flip one bit
|
|
assert compute_syndrome_weight(corrupted.tolist()) > 0
|
|
```
|
|
|
|
**Step 2: Run tests to verify they pass**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc.py -v`
|
|
Expected: All tests PASS (these test existing working code)
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add model/test_ldpc.py
|
|
git commit -m "test: add validation tests for existing LDPC model"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: Frame Sync — Streaming Infrastructure
|
|
|
|
**Files:**
|
|
- Create: `model/frame_sync.py`
|
|
- Create: `model/test_frame_sync.py`
|
|
|
|
**Step 1: Write the test for stream generation and syndrome screening**
|
|
|
|
```python
|
|
"""Tests for frame synchronization."""
|
|
import numpy as np
|
|
import pytest
|
|
from ldpc_sim import (
|
|
build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr,
|
|
compute_syndrome_weight, N, K, Z,
|
|
)
|
|
from frame_sync import generate_stream, syndrome_screen
|
|
|
|
|
|
class TestStreamGeneration:
|
|
def test_stream_length(self):
|
|
"""Stream should be n_frames * N + offset samples long."""
|
|
H = build_full_h_matrix()
|
|
np.random.seed(42)
|
|
stream_llr, true_offset, info_list = generate_stream(
|
|
H, n_frames=5, lam_s=5.0, lam_b=0.1, offset=10
|
|
)
|
|
assert len(stream_llr) == 5 * N + 10
|
|
assert true_offset == 10
|
|
assert len(info_list) == 5
|
|
|
|
def test_stream_zero_offset(self):
|
|
H = build_full_h_matrix()
|
|
np.random.seed(42)
|
|
stream_llr, true_offset, _ = generate_stream(
|
|
H, n_frames=3, lam_s=5.0, lam_b=0.1, offset=0
|
|
)
|
|
assert true_offset == 0
|
|
assert len(stream_llr) == 3 * N
|
|
|
|
def test_stream_random_offset(self):
|
|
H = build_full_h_matrix()
|
|
np.random.seed(42)
|
|
stream_llr, true_offset, _ = generate_stream(
|
|
H, n_frames=5, lam_s=5.0, lam_b=0.1, offset=None
|
|
)
|
|
assert 0 <= true_offset < N
|
|
assert len(stream_llr) == 5 * N + true_offset
|
|
|
|
|
|
class TestSyndromeScreen:
|
|
def test_correct_offset_low_syndrome(self):
|
|
"""At high SNR, correct offset should have very low syndrome weight."""
|
|
H = build_full_h_matrix()
|
|
np.random.seed(42)
|
|
stream_llr, true_offset, _ = generate_stream(
|
|
H, n_frames=5, lam_s=10.0, lam_b=0.1, offset=37
|
|
)
|
|
scores = syndrome_screen(stream_llr, n_offsets=N)
|
|
# Correct offset should have the lowest (or near-lowest) syndrome
|
|
best_offset = min(scores, key=scores.get)
|
|
assert best_offset == 37
|
|
|
|
def test_wrong_offsets_high_syndrome(self):
|
|
"""Wrong offsets should have syndrome weight near M/2 ~ 112."""
|
|
H = build_full_h_matrix()
|
|
np.random.seed(42)
|
|
stream_llr, true_offset, _ = generate_stream(
|
|
H, n_frames=5, lam_s=10.0, lam_b=0.1, offset=0
|
|
)
|
|
scores = syndrome_screen(stream_llr, n_offsets=N)
|
|
wrong_scores = [v for k, v in scores.items() if k != 0]
|
|
avg_wrong = np.mean(wrong_scores)
|
|
assert avg_wrong > 80 # should be near 112 for random
|
|
```
|
|
|
|
**Step 2: Run tests to verify they fail**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_frame_sync.py -v`
|
|
Expected: FAIL — `frame_sync` module doesn't exist yet
|
|
|
|
**Step 3: Implement generate_stream and syndrome_screen**
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Frame Synchronization Prototype for LDPC Optical Decoder.
|
|
|
|
Simulates a continuous photon-counting stream with unknown frame alignment.
|
|
Tests acquisition (finding codeword boundaries) and re-sync after loss.
|
|
|
|
Usage:
|
|
python3 frame_sync.py # Quick demo at a few lambda_s
|
|
python3 frame_sync.py --sweep # Acquisition success vs lambda_s
|
|
python3 frame_sync.py --resync-test # Re-sync after offset slip
|
|
"""
|
|
import numpy as np
|
|
import argparse
|
|
import json
|
|
import os
|
|
from ldpc_sim import (
|
|
build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr,
|
|
decode_layered_min_sum, compute_syndrome_weight,
|
|
N, K, M, Z, N_BASE, M_BASE, H_BASE,
|
|
)
|
|
|
|
SCREENING_THRESHOLD = 50 # syndrome weight below this triggers full decode
|
|
|
|
|
|
def generate_stream(H, n_frames=10, lam_s=2.0, lam_b=0.1, offset=None):
|
|
"""
|
|
Generate a continuous stream of channel LLRs with unknown frame offset.
|
|
|
|
Args:
|
|
H: full parity check matrix
|
|
n_frames: number of codewords to concatenate
|
|
lam_s: signal photons/slot
|
|
lam_b: background photons/slot
|
|
offset: bit offset to insert (None = random 0..N-1)
|
|
|
|
Returns:
|
|
stream_llr: continuous LLR stream (float, not quantized)
|
|
true_offset: the actual offset used
|
|
info_list: list of info bit arrays for each frame (ground truth)
|
|
"""
|
|
if offset is None:
|
|
offset = np.random.randint(0, N)
|
|
|
|
# Generate random prefix LLRs for the offset (noise-only, no codeword)
|
|
# These represent receiving channel output before the first codeword starts
|
|
prefix_llr = np.random.normal(0, 1, offset) # just noise
|
|
|
|
# Generate codewords and their channel outputs
|
|
info_list = []
|
|
codeword_llrs = []
|
|
for _ in range(n_frames):
|
|
info = np.random.randint(0, 2, K).astype(np.int8)
|
|
codeword = ldpc_encode(info, H)
|
|
llr, _ = poisson_channel(codeword, lam_s, lam_b)
|
|
info_list.append(info)
|
|
codeword_llrs.append(llr)
|
|
|
|
stream_llr = np.concatenate([prefix_llr] + codeword_llrs)
|
|
return stream_llr, offset, info_list
|
|
|
|
|
|
def syndrome_screen(stream_llr, n_offsets=None):
|
|
"""
|
|
Screen all candidate offsets using hard-decision syndrome weight.
|
|
|
|
Args:
|
|
stream_llr: continuous LLR stream
|
|
n_offsets: number of offsets to try (default N=256)
|
|
|
|
Returns:
|
|
scores: dict mapping offset -> syndrome weight
|
|
"""
|
|
if n_offsets is None:
|
|
n_offsets = N
|
|
|
|
scores = {}
|
|
for off in range(n_offsets):
|
|
if off + N > len(stream_llr):
|
|
break
|
|
window = stream_llr[off:off + N]
|
|
hard = [0 if v > 0 else 1 for v in window]
|
|
syn_wt = compute_syndrome_weight(hard)
|
|
scores[off] = syn_wt
|
|
return scores
|
|
|
|
|
|
def acquire_sync(stream_llr, max_confirm=2, max_iter=30, verbose=False):
|
|
"""
|
|
Acquire frame synchronization from a continuous stream.
|
|
|
|
Algorithm:
|
|
1. Syndrome-screen all 256 offsets
|
|
2. For offsets below threshold, run full iterative decode
|
|
3. Confirm by decoding next max_confirm consecutive frames
|
|
|
|
Args:
|
|
stream_llr: continuous LLR stream
|
|
max_confirm: number of consecutive frames to confirm sync
|
|
max_iter: decoder max iterations
|
|
verbose: print screening details
|
|
|
|
Returns:
|
|
dict with keys:
|
|
locked: bool
|
|
offset: detected offset (or -1)
|
|
offsets_screened: number of offsets tried
|
|
full_decodes: number of full iterative decodes run
|
|
screening_cost: equivalent decode cycles for syndrome screening
|
|
"""
|
|
scores = syndrome_screen(stream_llr)
|
|
# Sort by syndrome weight (best candidates first)
|
|
ranked = sorted(scores.items(), key=lambda x: x[1])
|
|
|
|
full_decodes = 0
|
|
for off, syn_wt in ranked:
|
|
if syn_wt >= SCREENING_THRESHOLD:
|
|
break # all remaining offsets are worse
|
|
|
|
# Full iterative decode at this offset
|
|
window = stream_llr[off:off + N]
|
|
llr_q = quantize_llr(window)
|
|
decoded, converged, iters, _ = decode_layered_min_sum(llr_q, max_iter)
|
|
full_decodes += 1
|
|
|
|
if not converged:
|
|
continue
|
|
|
|
# Confirm with consecutive frames
|
|
confirmed = True
|
|
for f in range(1, max_confirm + 1):
|
|
start = off + f * N
|
|
if start + N > len(stream_llr):
|
|
confirmed = False
|
|
break
|
|
win_f = stream_llr[start:start + N]
|
|
llr_q_f = quantize_llr(win_f)
|
|
_, conv_f, _, _ = decode_layered_min_sum(llr_q_f, max_iter)
|
|
full_decodes += 1
|
|
if not conv_f:
|
|
confirmed = False
|
|
break
|
|
|
|
if confirmed:
|
|
# Cost model: syndrome screening ~ 1/30 of a full decode each
|
|
screening_cost = len(scores) / 30.0
|
|
return {
|
|
'locked': True,
|
|
'offset': off,
|
|
'offsets_screened': len(scores),
|
|
'full_decodes': full_decodes,
|
|
'screening_cost': screening_cost,
|
|
'total_equiv_decodes': screening_cost + full_decodes,
|
|
}
|
|
|
|
return {
|
|
'locked': False,
|
|
'offset': -1,
|
|
'offsets_screened': len(scores),
|
|
'full_decodes': full_decodes,
|
|
'screening_cost': len(scores) / 30.0,
|
|
'total_equiv_decodes': len(scores) / 30.0 + full_decodes,
|
|
}
|
|
|
|
|
|
def resync_test(stream_llr, true_offset, slip_amount, search_radius=16,
|
|
max_iter=30):
|
|
"""
|
|
Test re-synchronization after an offset slip.
|
|
|
|
Simulates the receiver losing sync by `slip_amount` bits,
|
|
then searching nearby offsets to re-acquire.
|
|
|
|
Returns:
|
|
dict with locked, detected offset, cost
|
|
"""
|
|
slipped_offset = (true_offset + slip_amount) % N
|
|
|
|
# First try nearby offsets (fast re-sync)
|
|
for delta in range(-search_radius, search_radius + 1):
|
|
off = (slipped_offset + delta) % N
|
|
if off + N > len(stream_llr):
|
|
continue
|
|
window = stream_llr[off:off + N]
|
|
hard = [0 if v > 0 else 1 for v in window]
|
|
syn_wt = compute_syndrome_weight(hard)
|
|
if syn_wt < SCREENING_THRESHOLD:
|
|
llr_q = quantize_llr(window)
|
|
_, converged, _, _ = decode_layered_min_sum(llr_q, max_iter)
|
|
if converged:
|
|
return {
|
|
'locked': True,
|
|
'offset': off,
|
|
'slip': slip_amount,
|
|
'search_radius_used': search_radius,
|
|
'needed_full_search': False,
|
|
}
|
|
|
|
# Fall back to full search
|
|
result = acquire_sync(stream_llr)
|
|
result['slip'] = slip_amount
|
|
result['search_radius_used'] = search_radius
|
|
result['needed_full_search'] = True
|
|
return result
|
|
|
|
|
|
def run_acquisition_sweep(lam_s_values, lam_b=0.1, n_trials=50, n_frames=10):
|
|
"""Sweep lambda_s and measure acquisition success rate."""
|
|
H = build_full_h_matrix()
|
|
print(f"{'lam_s':>8s} {'success%':>10s} {'avg_decodes':>12s} "
|
|
f"{'avg_cost':>10s} {'false_lock':>10s}")
|
|
print("-" * 55)
|
|
|
|
results = []
|
|
for lam_s in lam_s_values:
|
|
successes = 0
|
|
false_locks = 0
|
|
total_decodes = 0
|
|
total_cost = 0
|
|
|
|
for trial in range(n_trials):
|
|
np.random.seed(trial * 1000 + int(lam_s * 100))
|
|
stream, true_off, _ = generate_stream(
|
|
H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None
|
|
)
|
|
result = acquire_sync(stream)
|
|
|
|
if result['locked']:
|
|
if result['offset'] == true_off:
|
|
successes += 1
|
|
else:
|
|
false_locks += 1
|
|
total_decodes += result['full_decodes']
|
|
total_cost += result['total_equiv_decodes']
|
|
|
|
pct = 100 * successes / n_trials
|
|
avg_dec = total_decodes / n_trials
|
|
avg_cost = total_cost / n_trials
|
|
fl_pct = 100 * false_locks / n_trials
|
|
|
|
print(f"{lam_s:8.1f} {pct:9.1f}% {avg_dec:12.1f} "
|
|
f"{avg_cost:10.1f} {fl_pct:9.1f}%")
|
|
results.append({
|
|
'lam_s': lam_s, 'success_pct': pct,
|
|
'avg_full_decodes': avg_dec, 'avg_equiv_cost': avg_cost,
|
|
'false_lock_pct': fl_pct,
|
|
})
|
|
return results
|
|
|
|
|
|
def run_resync_sweep(lam_s=5.0, lam_b=0.1, n_trials=20, n_frames=10):
|
|
"""Test re-sync at various slip amounts."""
|
|
H = build_full_h_matrix()
|
|
print(f"\nRe-sync test at lam_s={lam_s}")
|
|
print(f"{'slip':>6s} {'success%':>10s} {'full_search%':>12s}")
|
|
print("-" * 32)
|
|
|
|
for slip in [1, 2, 4, 8, 16, 32, 64, 128]:
|
|
successes = 0
|
|
full_searches = 0
|
|
for trial in range(n_trials):
|
|
np.random.seed(trial * 100 + slip)
|
|
stream, true_off, _ = generate_stream(
|
|
H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None
|
|
)
|
|
result = resync_test(stream, true_off, slip)
|
|
if result['locked']:
|
|
successes += 1
|
|
if result.get('needed_full_search'):
|
|
full_searches += 1
|
|
|
|
print(f"{slip:6d} {100*successes/n_trials:9.1f}% "
|
|
f"{100*full_searches/n_trials:11.1f}%")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='LDPC Frame Sync Prototype')
|
|
parser.add_argument('--sweep', action='store_true',
|
|
help='Acquisition success rate vs lambda_s')
|
|
parser.add_argument('--resync-test', action='store_true',
|
|
help='Re-sync after offset slip')
|
|
parser.add_argument('--lam-s', type=float, default=5.0,
|
|
help='Signal photons/slot (default: 5.0)')
|
|
parser.add_argument('--lam-b', type=float, default=0.1,
|
|
help='Background photons/slot (default: 0.1)')
|
|
parser.add_argument('--n-trials', type=int, default=50,
|
|
help='Trials per data point (default: 50)')
|
|
parser.add_argument('--seed', type=int, default=42)
|
|
args = parser.parse_args()
|
|
|
|
if args.sweep:
|
|
print("=== Frame Sync Acquisition Sweep ===")
|
|
lam_s_values = [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
|
|
results = run_acquisition_sweep(lam_s_values, args.lam_b, args.n_trials)
|
|
out_path = os.path.join(os.path.dirname(__file__), '..', 'data',
|
|
'sync_acquisition_results.json')
|
|
os.makedirs(os.path.dirname(out_path), exist_ok=True)
|
|
with open(out_path, 'w') as f:
|
|
json.dump(results, f, indent=2)
|
|
print(f"\nResults saved to {out_path}")
|
|
|
|
elif args.resync_test:
|
|
run_resync_sweep(args.lam_s, args.lam_b, args.n_trials)
|
|
|
|
else:
|
|
# Quick demo
|
|
print("=== Frame Sync Demo ===")
|
|
H = build_full_h_matrix()
|
|
np.random.seed(args.seed)
|
|
stream, true_off, infos = generate_stream(
|
|
H, n_frames=10, lam_s=args.lam_s, lam_b=args.lam_b, offset=None
|
|
)
|
|
print(f"Stream: {len(stream)} samples, true offset: {true_off}")
|
|
print(f"Signal: lam_s={args.lam_s}, lam_b={args.lam_b}")
|
|
|
|
result = acquire_sync(stream, verbose=True)
|
|
if result['locked']:
|
|
match = "CORRECT" if result['offset'] == true_off else "WRONG"
|
|
print(f"Locked at offset {result['offset']} ({match})")
|
|
print(f" Screened {result['offsets_screened']} offsets")
|
|
print(f" Full decodes: {result['full_decodes']}")
|
|
print(f" Total cost: {result['total_equiv_decodes']:.1f} "
|
|
f"equiv decodes")
|
|
else:
|
|
print("FAILED to acquire sync")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
```
|
|
|
|
**Step 4: Run tests to verify they pass**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_frame_sync.py -v`
|
|
Expected: All PASS
|
|
|
|
**Step 5: Run the demo to verify it works end-to-end**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --lam-s 5.0`
|
|
Expected: Should lock at the correct offset
|
|
|
|
**Step 6: Commit**
|
|
|
|
```bash
|
|
git add model/frame_sync.py model/test_frame_sync.py
|
|
git commit -m "feat: add frame synchronization prototype with tests"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: Code Analysis — Rate Comparison
|
|
|
|
**Files:**
|
|
- Create: `model/ldpc_analysis.py`
|
|
- Create: `model/test_ldpc_analysis.py`
|
|
|
|
**Step 1: Write the test for rate sweep**
|
|
|
|
```python
|
|
"""Tests for ldpc_analysis.py."""
|
|
import numpy as np
|
|
import pytest
|
|
from ldpc_analysis import build_ira_staircase, rate_sweep_single
|
|
|
|
|
|
class TestIRAStaircase:
|
|
def test_rate_1_2(self):
|
|
"""Rate 1/2: 1x2 base matrix, n=64, k=32."""
|
|
H_base, H_full = build_ira_staircase(m_base=1, z=32)
|
|
assert H_base.shape == (1, 2)
|
|
assert H_full.shape == (32, 64)
|
|
assert np.linalg.matrix_rank(H_full.astype(float)) == 32
|
|
|
|
def test_rate_1_4(self):
|
|
"""Rate 1/4: 3x4 base matrix, n=128, k=32."""
|
|
H_base, H_full = build_ira_staircase(m_base=3, z=32)
|
|
assert H_base.shape == (3, 4)
|
|
assert H_full.shape == (96, 128)
|
|
assert np.linalg.matrix_rank(H_full.astype(float)) == 96
|
|
|
|
def test_rate_1_8_matches_existing(self):
|
|
"""Rate 1/8 staircase should produce a valid code."""
|
|
H_base, H_full = build_ira_staircase(m_base=7, z=32)
|
|
assert H_base.shape == (7, 8)
|
|
assert H_full.shape == (224, 256)
|
|
|
|
def test_encode_decode_roundtrip(self):
|
|
"""Any generated staircase code should encode/decode at high SNR."""
|
|
for m_base in [1, 2, 3, 5]:
|
|
H_base, H_full = build_ira_staircase(m_base=m_base, z=32)
|
|
n_base = m_base + 1
|
|
k = 32
|
|
n = n_base * 32
|
|
np.random.seed(42)
|
|
info = np.random.randint(0, 2, k).astype(np.int8)
|
|
# Use generic IRA encoder
|
|
from ldpc_analysis import ira_encode
|
|
codeword = ira_encode(info, H_base, H_full, z=32)
|
|
syndrome = H_full @ codeword % 2
|
|
assert np.all(syndrome == 0), f"Failed for m_base={m_base}"
|
|
|
|
|
|
class TestRateSweep:
|
|
def test_high_snr_all_rates_decode(self):
|
|
"""At very high SNR, all rates should achieve FER=0."""
|
|
for m_base in [1, 3, 7]:
|
|
result = rate_sweep_single(m_base=m_base, z=32, lam_s=20.0,
|
|
lam_b=0.1, n_frames=10)
|
|
assert result['fer'] == 0.0, f"m_base={m_base} failed at lam_s=20"
|
|
```
|
|
|
|
**Step 2: Run tests to verify they fail**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc_analysis.py::TestIRAStaircase -v`
|
|
Expected: FAIL — module doesn't exist
|
|
|
|
**Step 3: Implement build_ira_staircase, ira_encode, and rate_sweep_single**
|
|
|
|
The key functions:
|
|
- `build_ira_staircase(m_base, z)`: Generate an IRA staircase base matrix and expand to full H. Info column connected to all rows, lower-triangular parity with spread shift values.
|
|
- `ira_encode(info, H_base, H_full, z)`: Generic IRA encoder that works for any staircase code (sequential parity solve, same algorithm as ldpc_sim.py).
|
|
- `rate_sweep_single(m_base, z, lam_s, lam_b, n_frames)`: Run n_frames through encode/channel/decode, return BER/FER.
|
|
- `run_rate_sweep(...)`: Full sweep over rates and lambda_s values, print summary table.
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
LDPC Code Analysis Tool — Quantitative design validation.
|
|
|
|
Four analyses:
|
|
--rate-sweep: Compare code rates (1/2 through 1/8) vs lambda_s
|
|
--matrix-compare: Compare base matrix designs at rate 1/8
|
|
--quant-sweep: Quantization bits (4-32) impact on FER
|
|
--shannon-gap: Distance from Poisson channel capacity
|
|
|
|
Usage:
|
|
python3 ldpc_analysis.py --rate-sweep
|
|
python3 ldpc_analysis.py --matrix-compare
|
|
python3 ldpc_analysis.py --quant-sweep
|
|
python3 ldpc_analysis.py --shannon-gap
|
|
python3 ldpc_analysis.py --all
|
|
"""
|
|
import numpy as np
|
|
import argparse
|
|
import json
|
|
import os
|
|
from ldpc_sim import (
|
|
poisson_channel, quantize_llr, decode_layered_min_sum,
|
|
compute_syndrome_weight, sat_add_q, sat_sub_q, min_sum_cn_update,
|
|
Q_BITS, Q_MAX, Q_MIN, OFFSET,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Generic IRA staircase code construction
|
|
# ============================================================================
|
|
|
|
def build_ira_staircase(m_base, z=32):
|
|
"""
|
|
Build an IRA (Irregular Repeat-Accumulate) staircase QC-LDPC code.
|
|
|
|
Structure:
|
|
- Column 0: info bits, connected to ALL m_base rows (high protection)
|
|
- Columns 1..m_base: parity bits, lower-triangular staircase
|
|
- Each row has 2-3 nonzero entries (sparse)
|
|
- Shift values spread across [0, z-1] to maximize girth
|
|
|
|
Args:
|
|
m_base: number of base matrix rows (rate = 1 / (m_base + 1))
|
|
z: lifting factor
|
|
|
|
Returns:
|
|
H_base: (m_base, m_base+1) array of shift values (-1 = not connected)
|
|
H_full: (m_base*z, (m_base+1)*z) full binary parity check matrix
|
|
"""
|
|
n_base = m_base + 1
|
|
H_base = np.full((m_base, n_base), -1, dtype=np.int16)
|
|
|
|
# Info column (col 0): connect to all rows with spread shifts
|
|
for r in range(m_base):
|
|
H_base[r, 0] = (r * z // m_base) % z
|
|
|
|
# Staircase parity: row r connects to col r+1 (diagonal)
|
|
# Row 0: col 1 only (besides info)
|
|
# Row r>0: col r and col r+1
|
|
H_base[0, 1] = (5 * z // m_base) % z if m_base > 1 else 0
|
|
|
|
for r in range(1, m_base):
|
|
# Sub-diagonal: connect to previous parity column
|
|
H_base[r, r] = (r * 7) % z
|
|
# Diagonal: connect to own parity column
|
|
H_base[r, r + 1] = 0
|
|
|
|
# Expand to full binary matrix
|
|
n = n_base * z
|
|
m = m_base * z
|
|
H_full = np.zeros((m, n), dtype=np.int8)
|
|
for r in range(m_base):
|
|
for c in range(n_base):
|
|
shift = int(H_base[r, c])
|
|
if shift < 0:
|
|
continue
|
|
for zz in range(z):
|
|
col_idx = c * z + (zz + shift) % z
|
|
H_full[r * z + zz, col_idx] = 1
|
|
|
|
return H_base, H_full
|
|
|
|
|
|
def ira_encode(info, H_base, H_full, z=32):
|
|
"""
|
|
Encode using IRA staircase structure (generic, any rate).
|
|
|
|
Same algorithm as ldpc_sim.ldpc_encode but works for any staircase.
|
|
"""
|
|
m_base = H_base.shape[0]
|
|
n_base = H_base.shape[1]
|
|
k = z
|
|
|
|
assert len(info) == k
|
|
|
|
def apply_p(x, s):
|
|
return np.roll(x, -int(s))
|
|
|
|
def inv_p(y, s):
|
|
return np.roll(y, int(s))
|
|
|
|
p = [np.zeros(z, dtype=np.int8) for _ in range(n_base)]
|
|
|
|
# Row 0: solve for p[1]
|
|
accum = apply_p(info, H_base[0, 0])
|
|
p[1] = inv_p(accum, H_base[0, 1])
|
|
|
|
# Rows 1..m_base-1: solve for p[2]..p[m_base]
|
|
for r in range(1, m_base):
|
|
accum = apply_p(info, H_base[r, 0])
|
|
accum = (accum + apply_p(p[r], H_base[r, r])) % 2
|
|
p[r + 1] = inv_p(accum, H_base[r, r + 1])
|
|
|
|
codeword = np.concatenate([info] + [p[c] for c in range(1, n_base)])
|
|
|
|
# Verify
|
|
check = H_full @ codeword % 2
|
|
assert np.all(check == 0), f"Encoding failed: syndrome weight = {check.sum()}"
|
|
|
|
return codeword
|
|
|
|
|
|
def generic_decode(llr_q, H_base, z=32, max_iter=30, early_term=True,
|
|
q_bits=Q_BITS):
|
|
"""
|
|
Generic layered min-sum decoder for any IRA staircase code.
|
|
|
|
Same algorithm as ldpc_sim.decode_layered_min_sum but parameterized.
|
|
"""
|
|
m_base = H_base.shape[0]
|
|
n_base = H_base.shape[1]
|
|
n = n_base * z
|
|
k = z
|
|
q_max = 2**(q_bits-1) - 1
|
|
q_min = -(2**(q_bits-1))
|
|
|
|
def sat_add(a, b):
|
|
s = int(a) + int(b)
|
|
return max(q_min, min(q_max, s))
|
|
|
|
def sat_sub(a, b):
|
|
return sat_add(a, -b)
|
|
|
|
beliefs = [int(x) for x in llr_q]
|
|
msg = [[[0]*z for _ in range(n_base)] for _ in range(m_base)]
|
|
|
|
for iteration in range(max_iter):
|
|
for row in range(m_base):
|
|
connected = [c for c in range(n_base) if H_base[row, c] >= 0]
|
|
dc = len(connected)
|
|
|
|
vn_to_cn = [[0]*z for _ in range(dc)]
|
|
for ci, col in enumerate(connected):
|
|
shift = int(H_base[row, col])
|
|
for zz in range(z):
|
|
sz = (zz + shift) % z
|
|
bi = col * z + sz
|
|
vn_to_cn[ci][zz] = sat_sub(beliefs[bi], msg[row][col][zz])
|
|
|
|
cn_to_vn = [[0]*z for _ in range(dc)]
|
|
for zz in range(z):
|
|
cn_in = [vn_to_cn[ci][zz] for ci in range(dc)]
|
|
cn_out = min_sum_cn_update(cn_in)
|
|
for ci in range(dc):
|
|
cn_to_vn[ci][zz] = cn_out[ci]
|
|
|
|
for ci, col in enumerate(connected):
|
|
shift = int(H_base[row, col])
|
|
for zz in range(z):
|
|
sz = (zz + shift) % z
|
|
bi = col * z + sz
|
|
beliefs[bi] = sat_add(vn_to_cn[ci][zz], cn_to_vn[ci][zz])
|
|
msg[row][col][zz] = cn_to_vn[ci][zz]
|
|
|
|
# Syndrome check
|
|
hard = [1 if b < 0 else 0 for b in beliefs]
|
|
syn_wt = 0
|
|
for r in range(m_base):
|
|
for zz in range(z):
|
|
parity = 0
|
|
for c in range(n_base):
|
|
shift = int(H_base[r, c])
|
|
if shift < 0:
|
|
continue
|
|
sz = (zz + shift) % z
|
|
bi = c * z + sz
|
|
parity ^= hard[bi]
|
|
if parity:
|
|
syn_wt += 1
|
|
|
|
if early_term and syn_wt == 0:
|
|
return np.array(hard[:k]), True, iteration + 1, 0
|
|
|
|
hard = [1 if b < 0 else 0 for b in beliefs]
|
|
return np.array(hard[:k]), syn_wt == 0, max_iter, syn_wt
|
|
|
|
|
|
def rate_sweep_single(m_base, z=32, lam_s=2.0, lam_b=0.1, n_frames=100,
|
|
max_iter=30, q_bits=Q_BITS):
|
|
"""
|
|
Run FER simulation for a single code rate and lambda_s.
|
|
|
|
Returns dict with ber, fer, avg_iter.
|
|
"""
|
|
H_base, H_full = build_ira_staircase(m_base, z)
|
|
k = z
|
|
bit_errors = 0
|
|
frame_errors = 0
|
|
total_bits = 0
|
|
total_iter = 0
|
|
|
|
for _ in range(n_frames):
|
|
info = np.random.randint(0, 2, k).astype(np.int8)
|
|
codeword = ira_encode(info, H_base, H_full, z)
|
|
llr, _ = poisson_channel(codeword, lam_s, lam_b)
|
|
|
|
# Quantize with configurable bits
|
|
q_max = 2**(q_bits-1) - 1
|
|
q_min = -(2**(q_bits-1))
|
|
scale = q_max / 5.0
|
|
llr_q = np.clip(np.round(llr * scale), q_min, q_max).astype(int)
|
|
|
|
decoded, converged, iters, _ = generic_decode(
|
|
llr_q, H_base, z, max_iter, q_bits=q_bits
|
|
)
|
|
total_iter += iters
|
|
errs = np.sum(decoded != info)
|
|
bit_errors += errs
|
|
total_bits += k
|
|
if errs > 0:
|
|
frame_errors += 1
|
|
|
|
return {
|
|
'rate': f"1/{m_base+1}",
|
|
'rate_val': 1.0 / (m_base + 1),
|
|
'm_base': m_base,
|
|
'lam_s': lam_s,
|
|
'ber': bit_errors / total_bits if total_bits > 0 else 0,
|
|
'fer': frame_errors / n_frames,
|
|
'avg_iter': total_iter / n_frames,
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Analysis 1: Rate Comparison
|
|
# ============================================================================
|
|
|
|
def run_rate_sweep(lam_b=0.1, n_frames=500, max_iter=30):
|
|
"""Compare code rates across lambda_s range."""
|
|
rates = [
|
|
(1, "1/2"),
|
|
(2, "1/3"),
|
|
(3, "1/4"),
|
|
(5, "1/6"),
|
|
(7, "1/8"),
|
|
]
|
|
lam_s_values = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0, 7.0, 10.0]
|
|
|
|
print("=== Rate Comparison: FER vs lambda_s ===")
|
|
print(f"Frames per point: {n_frames}, lam_b={lam_b}, max_iter={max_iter}")
|
|
print()
|
|
|
|
# Header
|
|
header = f"{'lam_s':>8s}"
|
|
for _, name in rates:
|
|
header += f" {name:>8s}"
|
|
print(header)
|
|
print("-" * (8 + 9 * len(rates)))
|
|
|
|
all_results = []
|
|
for lam_s in lam_s_values:
|
|
row = f"{lam_s:8.1f}"
|
|
for m_base, name in rates:
|
|
np.random.seed(42)
|
|
r = rate_sweep_single(m_base, lam_s=lam_s, lam_b=lam_b,
|
|
n_frames=n_frames, max_iter=max_iter)
|
|
row += f" {r['fer']:8.3f}"
|
|
all_results.append(r)
|
|
print(row)
|
|
|
|
# Summary: threshold lambda_s for each rate (FER < 10%)
|
|
print("\n--- Threshold lambda_s (FER < 10%) ---")
|
|
for m_base, name in rates:
|
|
rate_results = [r for r in all_results if r['m_base'] == m_base]
|
|
threshold = None
|
|
for r in sorted(rate_results, key=lambda x: x['lam_s']):
|
|
if r['fer'] < 0.10:
|
|
threshold = r['lam_s']
|
|
break
|
|
t_str = f"{threshold:.1f}" if threshold else ">10"
|
|
print(f" Rate {name}: threshold lam_s = {t_str} photons/slot")
|
|
|
|
return all_results
|
|
|
|
|
|
# ============================================================================
|
|
# Analysis 2: Base Matrix Comparison
|
|
# ============================================================================
|
|
|
|
def compute_girth(H_base, z):
|
|
"""
|
|
Compute girth (shortest cycle) of the Tanner graph.
|
|
|
|
Uses BFS from each VN to find shortest cycle.
|
|
"""
|
|
m_base = H_base.shape[0]
|
|
n_base = H_base.shape[1]
|
|
n = n_base * z
|
|
m = m_base * z
|
|
|
|
# Build adjacency: VN -> list of CN neighbors, CN -> list of VN neighbors
|
|
vn_to_cn = [[] for _ in range(n)]
|
|
cn_to_vn = [[] for _ in range(m)]
|
|
for r in range(m_base):
|
|
for c in range(n_base):
|
|
shift = int(H_base[r, c])
|
|
if shift < 0:
|
|
continue
|
|
for zz in range(z):
|
|
vn = c * z + (zz + shift) % z
|
|
cn = r * z + zz
|
|
vn_to_cn[vn].append(cn)
|
|
cn_to_vn[cn].append(vn)
|
|
|
|
min_cycle = float('inf')
|
|
|
|
# BFS from a subset of VNs (all VNs for small codes)
|
|
sample_vns = range(min(n, 64)) # sample for speed
|
|
for start_vn in sample_vns:
|
|
# BFS: track (node_type, node_id) with distances
|
|
# node_type: 0=VN, 1=CN
|
|
dist = {}
|
|
parent = {}
|
|
queue = [(0, start_vn, -1, -1)] # (distance, node, parent_type, parent_id)
|
|
dist[(0, start_vn)] = 0
|
|
parent[(0, start_vn)] = None
|
|
|
|
qi = 0
|
|
while qi < len(queue):
|
|
d, node, pt, pid = queue[qi]
|
|
qi += 1
|
|
node_type = 0 if (0, node) in dist and dist[(0, node)] == d else 1
|
|
|
|
# Determine actual type
|
|
if qi == 1:
|
|
node_type = 0 # start is VN
|
|
|
|
# Get neighbors
|
|
if (0, node) in dist and dist[(0, node)] == d:
|
|
# This is a VN, go to CN neighbors
|
|
for cn in vn_to_cn[node]:
|
|
if (1, cn) == (pt, pid):
|
|
continue # don't go back
|
|
if (1, cn) in dist:
|
|
cycle_len = d + 1 + dist[(1, cn)]
|
|
min_cycle = min(min_cycle, cycle_len)
|
|
else:
|
|
dist[(1, cn)] = d + 1
|
|
queue.append((d + 1, cn, 0, node))
|
|
elif (1, node) in dist and dist[(1, node)] == d:
|
|
# This is a CN, go to VN neighbors
|
|
for vn in cn_to_vn[node]:
|
|
if (0, vn) == (pt, pid):
|
|
continue
|
|
if (0, vn) in dist:
|
|
cycle_len = d + 1 + dist[(0, vn)]
|
|
min_cycle = min(min_cycle, cycle_len)
|
|
else:
|
|
dist[(0, vn)] = d + 1
|
|
queue.append((d + 1, vn, 1, node))
|
|
|
|
if d > min_cycle:
|
|
break
|
|
|
|
return min_cycle if min_cycle < float('inf') else -1
|
|
|
|
|
|
def build_improved_staircase(z=32):
|
|
"""
|
|
Improved IRA staircase with better degree distribution.
|
|
|
|
Adds extra connections to low-degree columns while keeping
|
|
lower-triangular parity structure for easy encoding.
|
|
Key improvement: col 7 gets extra connection (dv=1 -> dv=2).
|
|
"""
|
|
m_base = 7
|
|
n_base = 8
|
|
H_base = np.full((m_base, n_base), -1, dtype=np.int16)
|
|
|
|
# Info column (col 0): connect to all rows (same as original)
|
|
H_base[0, 0] = 0
|
|
H_base[1, 0] = 11
|
|
H_base[2, 0] = 17
|
|
H_base[3, 0] = 23
|
|
H_base[4, 0] = 29
|
|
H_base[5, 0] = 3
|
|
H_base[6, 0] = 9
|
|
|
|
# Staircase parity (same diagonal structure)
|
|
H_base[0, 1] = 5
|
|
H_base[1, 1] = 3; H_base[1, 2] = 0
|
|
H_base[2, 2] = 7; H_base[2, 3] = 0
|
|
H_base[3, 3] = 13; H_base[3, 4] = 0
|
|
H_base[4, 4] = 19; H_base[4, 5] = 0
|
|
H_base[5, 5] = 25; H_base[5, 6] = 0
|
|
H_base[6, 6] = 31; H_base[6, 7] = 0
|
|
|
|
# Improvements: add connections to boost weak columns
|
|
# Col 7 (dv=1 -> dv=2): connect to row 0
|
|
H_base[0, 7] = 15
|
|
# Col 1 currently dv=2, boost to dv=3: connect to row 3
|
|
H_base[3, 1] = 21
|
|
|
|
# Expand
|
|
n = n_base * z
|
|
m = m_base * z
|
|
H_full = np.zeros((m, n), dtype=np.int8)
|
|
for r in range(m_base):
|
|
for c in range(n_base):
|
|
shift = int(H_base[r, c])
|
|
if shift < 0:
|
|
continue
|
|
for zz in range(z):
|
|
col_idx = c * z + (zz + shift) % z
|
|
H_full[r * z + zz, col_idx] = 1
|
|
|
|
return H_base, H_full
|
|
|
|
|
|
def build_peg_matrix(z=32):
|
|
"""
|
|
PEG-constructed base matrix for rate 1/8.
|
|
|
|
Progressive Edge Growth: greedily add edges to maximize local girth.
|
|
Target degree distribution: VN degrees [3,2,2,2,2,2,2,2] average ~2.1.
|
|
More uniform than staircase.
|
|
|
|
Note: this loses the trivial staircase encoding property.
|
|
Encoding requires solving H*c=0 via back-substitution on the
|
|
parity submatrix, which must still be full-rank.
|
|
"""
|
|
m_base = 7
|
|
n_base = 8
|
|
H_base = np.full((m_base, n_base), -1, dtype=np.int16)
|
|
|
|
# PEG-like construction: spread edges to maximize girth
|
|
# Col 0 (info): connect to all 7 rows (dv=7, high protection)
|
|
H_base[0, 0] = 0
|
|
H_base[1, 0] = 11
|
|
H_base[2, 0] = 17
|
|
H_base[3, 0] = 23
|
|
H_base[4, 0] = 29
|
|
H_base[5, 0] = 3
|
|
H_base[6, 0] = 9
|
|
|
|
# Parity columns: each connected to exactly 2 rows (dv=2)
|
|
# with shifts chosen for maximum girth spread
|
|
# Ensure parity sub-matrix is invertible for encoding
|
|
H_base[0, 1] = 5; H_base[1, 1] = 3
|
|
H_base[1, 2] = 0; H_base[2, 2] = 7
|
|
H_base[2, 3] = 0; H_base[3, 3] = 13
|
|
H_base[3, 4] = 0; H_base[4, 4] = 19
|
|
H_base[4, 5] = 0; H_base[5, 5] = 25
|
|
H_base[5, 6] = 0; H_base[6, 6] = 31
|
|
H_base[6, 7] = 0; H_base[0, 7] = 15
|
|
|
|
# This is actually a ring structure (not strictly lower-triangular)
|
|
# Encoding: need to solve via the ring structure
|
|
|
|
# Expand
|
|
n = n_base * z
|
|
m = m_base * z
|
|
H_full = np.zeros((m, n), dtype=np.int8)
|
|
for r in range(m_base):
|
|
for c in range(n_base):
|
|
shift = int(H_base[r, c])
|
|
if shift < 0:
|
|
continue
|
|
for zz in range(z):
|
|
col_idx = c * z + (zz + shift) % z
|
|
H_full[r * z + zz, col_idx] = 1
|
|
|
|
return H_base, H_full
|
|
|
|
|
|
def peg_encode(info, H_base, H_full, z=32):
|
|
"""
|
|
Encode for PEG matrix using Gaussian elimination on parity submatrix.
|
|
|
|
For non-staircase matrices, we solve H_parity * p = H_info * info (mod 2)
|
|
using the full H matrix.
|
|
"""
|
|
m_base = H_base.shape[0]
|
|
n_base = H_base.shape[1]
|
|
k = z
|
|
m = m_base * z
|
|
n = n_base * z
|
|
|
|
# H = [H_info | H_parity], solve H_parity * p = H_info * info mod 2
|
|
H_info = H_full[:, :k]
|
|
H_parity = H_full[:, k:]
|
|
|
|
rhs = H_info @ info % 2
|
|
|
|
# Solve via GF(2) Gaussian elimination
|
|
# Augmented matrix [H_parity | rhs]
|
|
aug = np.hstack([H_parity.copy(), rhs.reshape(-1, 1)]).astype(np.int8)
|
|
rows, cols_p = H_parity.shape
|
|
|
|
pivot_row = 0
|
|
pivot_cols = []
|
|
for col in range(cols_p):
|
|
# Find pivot
|
|
found = -1
|
|
for r in range(pivot_row, rows):
|
|
if aug[r, col] == 1:
|
|
found = r
|
|
break
|
|
if found == -1:
|
|
continue
|
|
# Swap
|
|
aug[[pivot_row, found]] = aug[[found, pivot_row]]
|
|
pivot_cols.append(col)
|
|
# Eliminate
|
|
for r in range(rows):
|
|
if r != pivot_row and aug[r, col] == 1:
|
|
aug[r] = (aug[r] + aug[pivot_row]) % 2
|
|
pivot_row += 1
|
|
|
|
# Back-substitute
|
|
parity = np.zeros(cols_p, dtype=np.int8)
|
|
for i, col in enumerate(pivot_cols):
|
|
parity[col] = aug[i, -1]
|
|
|
|
codeword = np.concatenate([info, parity])
|
|
check = H_full @ codeword % 2
|
|
assert np.all(check == 0), f"PEG encoding failed: syn={check.sum()}"
|
|
return codeword
|
|
|
|
|
|
def run_matrix_comparison(lam_b=0.1, n_frames=500, max_iter=30):
|
|
"""Compare base matrix designs at rate 1/8."""
|
|
from ldpc_sim import H_BASE as ORIG_H_BASE, build_full_h_matrix, ldpc_encode
|
|
|
|
matrices = {
|
|
'Original staircase': (ORIG_H_BASE, build_full_h_matrix(), ldpc_encode),
|
|
}
|
|
|
|
# Improved staircase
|
|
imp_base, imp_full = build_improved_staircase()
|
|
# Need a custom encoder since it's not pure staircase anymore
|
|
# Actually it IS still lower-triangular in parity, but with extra
|
|
# connections — we can use the PEG encoder (GF2 solve) for safety
|
|
matrices['Improved staircase'] = (imp_base, imp_full, None)
|
|
|
|
# PEG
|
|
peg_base, peg_full = build_peg_matrix()
|
|
matrices['PEG ring'] = (peg_base, peg_full, None)
|
|
|
|
z = 32
|
|
lam_s_values = [0.5, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
|
|
|
|
print("=== Base Matrix Comparison (Rate 1/8) ===")
|
|
print(f"Frames per point: {n_frames}, lam_b={lam_b}")
|
|
print()
|
|
|
|
# Degree distributions and girth
|
|
for name, (H_base, H_full, _) in matrices.items():
|
|
m_base_m = H_base.shape[0]
|
|
n_base_m = H_base.shape[1]
|
|
vn_degrees = []
|
|
for c in range(n_base_m):
|
|
dv = sum(1 for r in range(m_base_m) if H_base[r, c] >= 0)
|
|
vn_degrees.append(dv)
|
|
cn_degrees = []
|
|
for r in range(m_base_m):
|
|
dc = sum(1 for c in range(n_base_m) if H_base[r, c] >= 0)
|
|
cn_degrees.append(dc)
|
|
girth = compute_girth(H_base, z)
|
|
print(f"{name}:")
|
|
print(f" VN degrees: {vn_degrees}")
|
|
print(f" CN degrees: {cn_degrees}")
|
|
print(f" Girth: {girth}")
|
|
rank = np.linalg.matrix_rank(H_full.astype(float))
|
|
print(f" H rank: {rank} / {H_base.shape[0]*z}")
|
|
print()
|
|
|
|
# FER sweep
|
|
header = f"{'lam_s':>8s}"
|
|
for name in matrices:
|
|
short = name[:12]
|
|
header += f" {short:>14s}"
|
|
print(header)
|
|
print("-" * (8 + 15 * len(matrices)))
|
|
|
|
all_results = []
|
|
for lam_s in lam_s_values:
|
|
row = f"{lam_s:8.1f}"
|
|
for name, (H_base, H_full, enc_fn) in matrices.items():
|
|
np.random.seed(42)
|
|
k = z
|
|
n = H_base.shape[1] * z
|
|
frame_errors = 0
|
|
|
|
for _ in range(n_frames):
|
|
info = np.random.randint(0, 2, k).astype(np.int8)
|
|
if enc_fn is not None:
|
|
codeword = enc_fn(info, H_full)
|
|
else:
|
|
codeword = peg_encode(info, H_base, H_full, z)
|
|
llr, _ = poisson_channel(codeword, lam_s, lam_b)
|
|
llr_q = quantize_llr(llr)
|
|
decoded, converged, _, _ = generic_decode(
|
|
llr_q, H_base, z, max_iter
|
|
)
|
|
if np.any(decoded != info):
|
|
frame_errors += 1
|
|
|
|
fer = frame_errors / n_frames
|
|
row += f" {fer:14.3f}"
|
|
all_results.append({
|
|
'matrix': name, 'lam_s': lam_s, 'fer': fer
|
|
})
|
|
print(row)
|
|
|
|
return all_results
|
|
|
|
|
|
# ============================================================================
|
|
# Analysis 3: Quantization Sweep
|
|
# ============================================================================
|
|
|
|
def run_quant_sweep(lam_s=None, lam_b=0.1, n_frames=500, max_iter=30):
|
|
"""Sweep quantization bits and measure FER impact."""
|
|
from ldpc_sim import H_BASE, build_full_h_matrix, ldpc_encode
|
|
|
|
H_base = H_BASE
|
|
H_full = build_full_h_matrix()
|
|
z = 32
|
|
|
|
quant_bits = [4, 5, 6, 8, 10, 16]
|
|
|
|
# If no lambda_s given, test at a few points
|
|
if lam_s is None:
|
|
lam_s_values = [2.0, 3.0, 5.0]
|
|
else:
|
|
lam_s_values = [lam_s]
|
|
|
|
print("=== Quantization Sweep ===")
|
|
print(f"Frames per point: {n_frames}, lam_b={lam_b}")
|
|
print()
|
|
|
|
header = f"{'lam_s':>8s}"
|
|
for q in quant_bits:
|
|
header += f" {q:>6d}-bit"
|
|
header += f" {'float':>10s}"
|
|
print(header)
|
|
print("-" * (8 + 10 * (len(quant_bits) + 1)))
|
|
|
|
all_results = []
|
|
for ls in lam_s_values:
|
|
row = f"{ls:8.1f}"
|
|
|
|
# Fixed quantization levels
|
|
for q in quant_bits:
|
|
np.random.seed(42)
|
|
frame_errors = 0
|
|
for _ in range(n_frames):
|
|
info = np.random.randint(0, 2, z).astype(np.int8)
|
|
codeword = ldpc_encode(info, H_full)
|
|
llr, _ = poisson_channel(codeword, ls, lam_b)
|
|
|
|
q_max = 2**(q-1) - 1
|
|
q_min = -(2**(q-1))
|
|
scale = q_max / 5.0
|
|
llr_q = np.clip(np.round(llr * scale), q_min, q_max).astype(int)
|
|
|
|
decoded, _, _, _ = generic_decode(
|
|
llr_q, H_base, z, max_iter, q_bits=q
|
|
)
|
|
if np.any(decoded != info):
|
|
frame_errors += 1
|
|
|
|
fer = frame_errors / n_frames
|
|
row += f" {fer:9.3f}"
|
|
all_results.append({
|
|
'lam_s': ls, 'q_bits': q, 'fer': fer
|
|
})
|
|
|
|
# Float (no quantization — use very wide integers to approximate)
|
|
np.random.seed(42)
|
|
frame_errors = 0
|
|
for _ in range(n_frames):
|
|
info = np.random.randint(0, 2, z).astype(np.int8)
|
|
codeword = ldpc_encode(info, H_full)
|
|
llr, _ = poisson_channel(codeword, ls, lam_b)
|
|
|
|
# Use 16-bit with high scale for "float-like" precision
|
|
scale = 1000.0
|
|
llr_q = np.clip(np.round(llr * scale), -32768, 32767).astype(int)
|
|
decoded, _, _, _ = generic_decode(
|
|
llr_q, H_base, z, max_iter, q_bits=16
|
|
)
|
|
if np.any(decoded != info):
|
|
frame_errors += 1
|
|
|
|
fer = frame_errors / n_frames
|
|
row += f" {fer:9.3f}"
|
|
all_results.append({
|
|
'lam_s': ls, 'q_bits': 'float', 'fer': fer
|
|
})
|
|
print(row)
|
|
|
|
return all_results
|
|
|
|
|
|
# ============================================================================
|
|
# Analysis 4: Shannon Gap
|
|
# ============================================================================
|
|
|
|
def poisson_channel_capacity(lam_s, lam_b, max_y=50):
|
|
"""
|
|
Compute binary-input Poisson channel capacity.
|
|
|
|
C = max_p I(X;Y) where X in {0,1}, Y ~ Poisson(x*lam_s + lam_b)
|
|
|
|
For OOK with equal priors (p=0.5):
|
|
I(X;Y) = H(Y) - 0.5*H(Y|X=0) - 0.5*H(Y|X=1)
|
|
"""
|
|
from scipy.stats import poisson
|
|
|
|
def entropy_poisson(lam, max_y):
|
|
"""Entropy of Poisson(lam) truncated to [0, max_y]."""
|
|
if lam < 1e-10:
|
|
return 0.0
|
|
pmf = [poisson.pmf(y, lam) for y in range(max_y + 1)]
|
|
total = sum(pmf)
|
|
h = 0.0
|
|
for p in pmf:
|
|
if p > 0:
|
|
h -= p * np.log2(p / total)
|
|
return h
|
|
|
|
# H(Y|X=0) and H(Y|X=1)
|
|
h_y_x0 = entropy_poisson(lam_b, max_y)
|
|
h_y_x1 = entropy_poisson(lam_s + lam_b, max_y)
|
|
|
|
# Optimize over input probability p
|
|
best_c = 0.0
|
|
for p1_pct in range(0, 101):
|
|
p1 = p1_pct / 100.0
|
|
p0 = 1.0 - p1
|
|
|
|
# H(Y) where Y has mixture distribution
|
|
pmf_mix = []
|
|
for y in range(max_y + 1):
|
|
pmf_mix.append(p0 * poisson.pmf(y, lam_b) +
|
|
p1 * poisson.pmf(y, lam_s + lam_b))
|
|
h_y = 0.0
|
|
for p in pmf_mix:
|
|
if p > 0:
|
|
h_y -= p * np.log2(p)
|
|
|
|
c = h_y - p0 * h_y_x0 - p1 * h_y_x1
|
|
best_c = max(best_c, c)
|
|
|
|
return best_c
|
|
|
|
|
|
def run_shannon_gap(lam_b=0.1):
|
|
"""Compute Shannon limits and compare to decoder thresholds."""
|
|
print("=== Shannon Gap Analysis ===")
|
|
print(f"Binary-input Poisson channel, lam_b={lam_b}")
|
|
print()
|
|
|
|
rates = [
|
|
(1, "1/2", 0.5),
|
|
(2, "1/3", 1/3),
|
|
(3, "1/4", 0.25),
|
|
(5, "1/6", 1/6),
|
|
(7, "1/8", 0.125),
|
|
]
|
|
|
|
print(f"{'Rate':>8s} {'Shannon':>10s} {'Capacity':>10s}")
|
|
print(f"{'':>8s} {'lam_s':>10s} {'at limit':>10s}")
|
|
print("-" * 32)
|
|
|
|
results = []
|
|
for m_base, name, rate_val in rates:
|
|
# Binary search for minimum lam_s where C >= rate
|
|
lo, hi = 0.01, 50.0
|
|
for _ in range(50): # bisection
|
|
mid = (lo + hi) / 2.0
|
|
c = poisson_channel_capacity(mid, lam_b)
|
|
if c >= rate_val:
|
|
hi = mid
|
|
else:
|
|
lo = mid
|
|
shannon_lam_s = (lo + hi) / 2.0
|
|
cap_at_limit = poisson_channel_capacity(shannon_lam_s, lam_b)
|
|
|
|
print(f"{name:>8s} {shannon_lam_s:10.3f} {cap_at_limit:10.4f}")
|
|
results.append({
|
|
'rate': name, 'rate_val': rate_val,
|
|
'shannon_lam_s': shannon_lam_s,
|
|
'capacity_at_limit': cap_at_limit,
|
|
})
|
|
|
|
print()
|
|
print("Shannon limit = minimum lam_s (photons/slot) where")
|
|
print("channel capacity >= code rate. Decoder must operate above this.")
|
|
print()
|
|
print("Compare decoder thresholds (from --rate-sweep) to these limits")
|
|
print("to compute the gap in dB: gap = 10*log10(decoder_threshold / shannon_limit)")
|
|
|
|
return results
|
|
|
|
|
|
# ============================================================================
|
|
# Main
|
|
# ============================================================================
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='LDPC Code Analysis')
|
|
parser.add_argument('--rate-sweep', action='store_true',
|
|
help='Compare code rates vs lambda_s')
|
|
parser.add_argument('--matrix-compare', action='store_true',
|
|
help='Compare base matrix designs')
|
|
parser.add_argument('--quant-sweep', action='store_true',
|
|
help='Quantization bits impact')
|
|
parser.add_argument('--shannon-gap', action='store_true',
|
|
help='Distance from channel capacity')
|
|
parser.add_argument('--all', action='store_true',
|
|
help='Run all analyses')
|
|
parser.add_argument('--n-frames', type=int, default=500,
|
|
help='Frames per data point (default: 500)')
|
|
parser.add_argument('--lam-b', type=float, default=0.1,
|
|
help='Background photons/slot (default: 0.1)')
|
|
parser.add_argument('--seed', type=int, default=42)
|
|
args = parser.parse_args()
|
|
|
|
np.random.seed(args.seed)
|
|
|
|
results = {}
|
|
|
|
if args.rate_sweep or args.all:
|
|
results['rate_sweep'] = run_rate_sweep(args.lam_b, args.n_frames)
|
|
print()
|
|
|
|
if args.matrix_compare or args.all:
|
|
results['matrix_compare'] = run_matrix_comparison(
|
|
args.lam_b, args.n_frames
|
|
)
|
|
print()
|
|
|
|
if args.quant_sweep or args.all:
|
|
results['quant_sweep'] = run_quant_sweep(
|
|
lam_b=args.lam_b, n_frames=args.n_frames
|
|
)
|
|
print()
|
|
|
|
if args.shannon_gap or args.all:
|
|
results['shannon_gap'] = run_shannon_gap(args.lam_b)
|
|
print()
|
|
|
|
if results:
|
|
out_path = os.path.join(os.path.dirname(__file__) or '.', '..',
|
|
'data', 'analysis_results.json')
|
|
os.makedirs(os.path.dirname(out_path), exist_ok=True)
|
|
with open(out_path, 'w') as f:
|
|
json.dump(results, f, indent=2, default=str)
|
|
print(f"Results saved to {out_path}")
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/test_ldpc_analysis.py -v`
|
|
Expected: All PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add model/ldpc_analysis.py model/test_ldpc_analysis.py
|
|
git commit -m "feat: add code analysis tool with rate sweep and matrix comparison"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 4: Run All Analyses and Capture Results
|
|
|
|
**Step 1: Run rate sweep**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --rate-sweep --n-frames 200`
|
|
Expected: Table showing FER vs lambda_s for each rate. Rate 1/8 should have the lowest threshold.
|
|
|
|
**Step 2: Run matrix comparison**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --matrix-compare --n-frames 200`
|
|
Expected: Table comparing original vs improved vs PEG matrices.
|
|
|
|
**Step 3: Run quantization sweep**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --quant-sweep --n-frames 200`
|
|
Expected: FER vs quantization bits table.
|
|
|
|
**Step 4: Run Shannon gap**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/ldpc_analysis.py --shannon-gap`
|
|
Expected: Shannon limit lambda_s for each rate.
|
|
|
|
**Step 5: Run frame sync sweep**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --sweep --n-trials 20`
|
|
Expected: Acquisition success rate vs lambda_s.
|
|
|
|
**Step 6: Run frame sync re-sync test**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 model/frame_sync.py --resync-test --lam-s 5.0 --n-trials 20`
|
|
Expected: Re-sync success at various slip amounts.
|
|
|
|
**Step 7: Review results, fix any bugs, commit data**
|
|
|
|
```bash
|
|
git add data/
|
|
git commit -m "data: add analysis and frame sync simulation results"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 5: Run Full Test Suite
|
|
|
|
**Step 1: Run all tests**
|
|
|
|
Run: `cd /home/cah/r2d2/code/fpga/claude_project/ldpc_optical && python3 -m pytest model/ -v`
|
|
Expected: All tests PASS
|
|
|
|
**Step 2: Final commit**
|
|
|
|
```bash
|
|
git add -A
|
|
git commit -m "test: all model tests passing for frame sync and code analysis"
|
|
```
|