From c427dfdd3d0e96578290b04f1cc8096e6fd28eb3 Mon Sep 17 00:00:00 2001 From: cah Date: Tue, 24 Feb 2026 04:40:07 -0700 Subject: [PATCH] feat: add frame synchronization prototype with tests Co-Authored-By: Claude Opus 4.6 --- model/frame_sync.py | 465 +++++++++++++++++++++++++++++++++++++++ model/test_frame_sync.py | 108 +++++++++ 2 files changed, 573 insertions(+) create mode 100644 model/frame_sync.py create mode 100644 model/test_frame_sync.py diff --git a/model/frame_sync.py b/model/frame_sync.py new file mode 100644 index 0000000..db0d925 --- /dev/null +++ b/model/frame_sync.py @@ -0,0 +1,465 @@ +#!/usr/bin/env python3 +""" +Frame Synchronization Prototype for LDPC Optical Communication + +Implements syndrome-based frame synchronization for a continuous stream of +LDPC-encoded codewords over a photon-counting optical channel. The receiver +must find the codeword boundary (offset) in the stream before decoding. + +Strategy: + 1. Syndrome screening: try all N candidate offsets, compute hard-decision + syndrome weight. Valid codeword boundaries produce low syndrome weight; + random offsets produce ~M/2 (~112). + 2. Full decode: for candidates below SCREENING_THRESHOLD, run iterative + min-sum decoding. If converged, confirm with consecutive frames. + 3. Re-sync: after lock, if offset drifts by a small slip, search locally + before falling back to full acquisition. + +Usage: + python3 frame_sync.py # Quick demo at lam_s=5.0 + python3 frame_sync.py --lam-s 3.0 # Demo at specific SNR + python3 frame_sync.py --sweep # Acquisition sweep over SNR + python3 frame_sync.py --resync-test # Re-sync robustness test +""" + +import numpy as np +import argparse + +from ldpc_sim import ( + build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr, + decode_layered_min_sum, compute_syndrome_weight, + N, K, M, Z, N_BASE, M_BASE, H_BASE, +) + +# Screening threshold: offsets with syndrome weight below this get full decode. +# Wrong offsets have syndrome weight ~M/2 = 112; correct offsets at decent SNR +# will be much lower. M/4 = 56, using 50 for margin. +SCREENING_THRESHOLD = 50 + + +def generate_stream(H, n_frames, lam_s, lam_b, offset): + """ + Generate a continuous LLR stream of concatenated LDPC codewords. + + Inserts a random (or specified) offset of noise-like LLRs at the start, + simulating the receiver not knowing where codeword boundaries are. + + Args: + H: full parity-check matrix (M x N) + n_frames: number of codewords to concatenate + lam_s: signal photons per slot + lam_b: background photons per slot + offset: number of noise LLRs prepended (None = random 0..N-1) + + Returns: + stream_llr: continuous float LLR stream (length = n_frames * N + offset) + true_offset: the actual offset used + info_list: list of info-bit arrays for each frame + """ + if offset is None: + offset = np.random.randint(0, N) + true_offset = offset + + # Noise prefix: random LLRs that do not correspond to any codeword + if offset > 0: + prefix = np.random.normal(0, 1, offset) + else: + prefix = np.array([], dtype=np.float64) + + # Encode and modulate each frame + frame_llrs = [] + info_list = [] + for _ in range(n_frames): + info = np.random.randint(0, 2, K).astype(np.int8) + codeword = ldpc_encode(info, H) + llr_float, _ = poisson_channel(codeword, lam_s, lam_b) + frame_llrs.append(llr_float) + info_list.append(info) + + stream_llr = np.concatenate([prefix] + frame_llrs) + return stream_llr, true_offset, info_list + + +def syndrome_screen(stream_llr, n_offsets=N): + """ + Screen candidate offsets by hard-decision syndrome weight. + + For each candidate offset, extract a 256-sample window from the stream, + make hard decisions (positive LLR -> 0, negative LLR -> 1), and compute + the syndrome weight. Correct offsets yield low syndrome weight; random + offsets yield ~M/2 (~112). + + Args: + stream_llr: continuous LLR stream (float) + n_offsets: number of offsets to try (0..n_offsets-1) + + Returns: + dict mapping offset -> syndrome weight + """ + scores = {} + stream_len = len(stream_llr) + for off in range(n_offsets): + end = off + N + if end > stream_len: + # Not enough data for a full codeword at this offset + scores[off] = M # worst case + continue + window = stream_llr[off:end] + # Hard decision: positive LLR -> bit 0, negative -> bit 1 + hard = [1 if v < 0 else 0 for v in window] + sw = compute_syndrome_weight(hard) + scores[off] = sw + return scores + + +def acquire_sync(stream_llr, H=None, max_confirm=2, max_iter=30): + """ + Acquire frame synchronization from a continuous LLR stream. + + Strategy: + 1. Syndrome-screen all N candidate offsets + 2. Sort by syndrome weight (best first) + 3. For candidates below SCREENING_THRESHOLD: run full iterative decode + 4. If decode converges: confirm by decoding next max_confirm frames + 5. Return result with cost metrics + + Args: + stream_llr: continuous LLR stream (float) + H: parity-check matrix (built if None) + max_confirm: number of consecutive frames to confirm after first lock + max_iter: max decoder iterations + + Returns: + dict with keys: + locked: bool - whether sync was acquired + offset: int or None - detected offset + offsets_screened: int - number of offsets screened + full_decodes: int - number of full iterative decodes attempted + screening_cost: float - screening cost in equivalent decode units + total_equiv_decodes: float - total cost in equivalent decode units + """ + if H is None: + H = build_full_h_matrix() + + stream_len = len(stream_llr) + + # Step 1: Syndrome screen all N offsets + scores = syndrome_screen(stream_llr, n_offsets=N) + + # Step 2: Sort candidates by syndrome weight + sorted_offsets = sorted(scores.keys(), key=lambda o: scores[o]) + + # Screening cost: each screen is ~1/max_iter of a full decode (just hard + # decisions + syndrome check, no iterative processing) + screening_cost = N / max_iter # in equivalent full decodes + + full_decodes = 0 + locked = False + detected_offset = None + + # Step 3: Try candidates below threshold + for off in sorted_offsets: + if scores[off] >= SCREENING_THRESHOLD: + break # sorted, so all remaining are worse + + # Full iterative decode at this offset + end = off + N + if end > stream_len: + continue + + window_llr = stream_llr[off:end] + llr_q = quantize_llr(window_llr) + decoded, converged, iters, syn_wt = decode_layered_min_sum( + llr_q, max_iter=max_iter + ) + full_decodes += 1 + + if not converged: + continue + + # Step 4: Confirm with consecutive frames + confirmed = True + for cf in range(1, max_confirm + 1): + cf_start = off + cf * N + cf_end = cf_start + N + if cf_end > stream_len: + confirmed = False + break + cf_window = stream_llr[cf_start:cf_end] + cf_llr_q = quantize_llr(cf_window) + cf_decoded, cf_converged, _, _ = decode_layered_min_sum( + cf_llr_q, max_iter=max_iter + ) + full_decodes += 1 + if not cf_converged: + confirmed = False + break + + if confirmed: + locked = True + detected_offset = off + break + + total_equiv_decodes = screening_cost + full_decodes + + return { + 'locked': locked, + 'offset': detected_offset, + 'offsets_screened': N, + 'full_decodes': full_decodes, + 'screening_cost': screening_cost, + 'total_equiv_decodes': total_equiv_decodes, + } + + +def resync_test(stream_llr, true_offset, slip_amount, H=None, + search_radius=16, max_iter=30): + """ + Simulate offset slip and attempt re-synchronization. + + After lock is established, the offset may drift by slip_amount. First + search locally (within +/- search_radius of the slipped offset), then + fall back to full acquire_sync if local search fails. + + Args: + stream_llr: continuous LLR stream + true_offset: the actual correct offset + slip_amount: how much the offset has drifted + H: parity-check matrix (built if None) + search_radius: local search window half-width + max_iter: max decoder iterations + + Returns: + dict with keys: + locked: bool + offset: int or None + slip: int - the slip amount tested + needed_full_search: bool - whether full acquire was needed + """ + if H is None: + H = build_full_h_matrix() + + stream_len = len(stream_llr) + slipped_offset = true_offset + slip_amount + + # Local search: try offsets near the slipped position + candidates = [] + for delta in range(-search_radius, search_radius + 1): + candidate = slipped_offset + delta + if 0 <= candidate and candidate + N <= stream_len: + candidates.append(candidate) + + # Screen candidates by syndrome weight + best_sw = M + 1 + best_off = None + for off in candidates: + window = stream_llr[off:off + N] + hard = [1 if v < 0 else 0 for v in window] + sw = compute_syndrome_weight(hard) + if sw < best_sw: + best_sw = sw + best_off = off + + # Try full decode on best local candidate + if best_off is not None and best_sw < SCREENING_THRESHOLD: + window_llr = stream_llr[best_off:best_off + N] + llr_q = quantize_llr(window_llr) + decoded, converged, _, _ = decode_layered_min_sum( + llr_q, max_iter=max_iter + ) + if converged: + return { + 'locked': True, + 'offset': best_off, + 'slip': slip_amount, + 'needed_full_search': False, + } + + # Local search failed; fall back to full acquisition + result = acquire_sync(stream_llr, H=H, max_iter=max_iter) + return { + 'locked': result['locked'], + 'offset': result['offset'], + 'slip': slip_amount, + 'needed_full_search': True, + } + + +def run_acquisition_sweep(lam_s_values, lam_b=0.1, n_trials=20, n_frames=5): + """ + Sweep lambda_s values and measure acquisition performance. + + Args: + lam_s_values: list of signal photon rates to test + lam_b: background photon rate + n_trials: trials per SNR point + n_frames: frames per stream + """ + H = build_full_h_matrix() + print(f"{'lam_s':>8s} {'lock_rate':>10s} {'false_lock':>11s} " + f"{'avg_cost':>10s} {'avg_decodes':>12s}") + print("-" * 55) + + for lam_s in lam_s_values: + n_locked = 0 + n_false_lock = 0 + total_cost = 0.0 + total_decodes = 0 + + for trial in range(n_trials): + stream_llr, true_offset, info_list = generate_stream( + H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None + ) + result = acquire_sync(stream_llr, H=H) + total_cost += result['total_equiv_decodes'] + total_decodes += result['full_decodes'] + + if result['locked']: + if result['offset'] == true_offset: + n_locked += 1 + else: + n_false_lock += 1 + + lock_rate = n_locked / n_trials + false_rate = n_false_lock / n_trials + avg_cost = total_cost / n_trials + avg_decodes = total_decodes / n_trials + + print(f"{lam_s:8.1f} {lock_rate:10.2f} {false_rate:11.2f} " + f"{avg_cost:10.1f} {avg_decodes:12.1f}") + + +def run_resync_sweep(lam_s=5.0, lam_b=0.1, n_trials=10, n_frames=5): + """ + Test re-synchronization at various slip amounts. + + Args: + lam_s: signal photon rate + lam_b: background photon rate + n_trials: trials per slip amount + n_frames: frames per stream + """ + H = build_full_h_matrix() + slip_amounts = [1, 2, 4, 8, 16, 32, 64, 128] + + print(f"Re-sync sweep: lam_s={lam_s}, lam_b={lam_b}, n_trials={n_trials}") + print(f"{'slip':>6s} {'lock_rate':>10s} {'correct':>8s} " + f"{'full_search':>12s}") + print("-" * 40) + + for slip in slip_amounts: + n_locked = 0 + n_correct = 0 + n_full_search = 0 + + for trial in range(n_trials): + stream_llr, true_offset, info_list = generate_stream( + H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None + ) + result = resync_test( + stream_llr, true_offset, slip, H=H + ) + if result['locked']: + n_locked += 1 + if result['offset'] == true_offset: + n_correct += 1 + if result['needed_full_search']: + n_full_search += 1 + + lock_rate = n_locked / n_trials + correct_rate = n_correct / n_trials + full_search_rate = n_full_search / n_trials + + print(f"{slip:6d} {lock_rate:10.2f} {correct_rate:8.2f} " + f"{full_search_rate:12.2f}") + + +def main(): + parser = argparse.ArgumentParser( + description='Frame Synchronization Prototype for LDPC Optical Communication' + ) + parser.add_argument('--sweep', action='store_true', + help='Run acquisition sweep over SNR') + parser.add_argument('--resync-test', action='store_true', + help='Run re-sync robustness test') + parser.add_argument('--lam-s', type=float, default=5.0, + help='Signal photons/slot (default: 5.0)') + parser.add_argument('--lam-b', type=float, default=0.1, + help='Background photons/slot (default: 0.1)') + parser.add_argument('--n-trials', type=int, default=20, + help='Number of trials per test point (default: 20)') + parser.add_argument('--seed', type=int, default=42, + help='Random seed (default: 42)') + args = parser.parse_args() + + np.random.seed(args.seed) + + if args.sweep: + print("=== Acquisition Sweep ===") + lam_s_values = [1.0, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0] + run_acquisition_sweep( + lam_s_values, lam_b=args.lam_b, n_trials=args.n_trials + ) + + elif args.resync_test: + print("=== Re-Sync Robustness Test ===") + run_resync_sweep( + lam_s=args.lam_s, lam_b=args.lam_b, n_trials=args.n_trials + ) + + else: + # Quick demo + print("=== Frame Sync Demo ===") + print(f"Code: ({N},{K}), rate {K/N:.3f}, Z={Z}") + print(f"lam_s={args.lam_s}, lam_b={args.lam_b}") + print() + + H = build_full_h_matrix() + + # Generate a stream with random offset + n_frames = 5 + stream_llr, true_offset, info_list = generate_stream( + H, n_frames=n_frames, lam_s=args.lam_s, lam_b=args.lam_b, + offset=None + ) + print(f"Generated stream: {len(stream_llr)} samples, " + f"{n_frames} frames, true offset={true_offset}") + + # Syndrome screening + print("\nSyndrome screening (all 256 offsets)...") + scores = syndrome_screen(stream_llr, n_offsets=N) + sorted_scores = sorted(scores.items(), key=lambda x: x[1]) + print(f" Best 5 offsets:") + for off, sw in sorted_scores[:5]: + marker = " <-- TRUE" if off == true_offset else "" + print(f" offset={off:3d} syndrome_weight={sw:3d}{marker}") + wrong_avg = np.mean([ + sw for off, sw in scores.items() if off != true_offset + ]) + print(f" Average wrong-offset syndrome weight: {wrong_avg:.1f}") + + # Full acquisition + print("\nRunning full acquisition...") + result = acquire_sync(stream_llr, H=H) + print(f" Locked: {result['locked']}") + print(f" Detected offset: {result['offset']} " + f"(true: {true_offset}, " + f"{'CORRECT' if result['offset'] == true_offset else 'WRONG'})") + print(f" Full decodes: {result['full_decodes']}") + print(f" Screening cost: {result['screening_cost']:.1f} equiv decodes") + print(f" Total cost: {result['total_equiv_decodes']:.1f} equiv decodes") + + # Re-sync test at a small slip + if result['locked']: + print("\nRe-sync test (slip=4)...") + resync_result = resync_test( + stream_llr, true_offset, slip_amount=4, H=H + ) + print(f" Locked: {resync_result['locked']}") + print(f" Offset: {resync_result['offset']} " + f"(true: {true_offset})") + print(f" Needed full search: {resync_result['needed_full_search']}") + + +if __name__ == '__main__': + main() diff --git a/model/test_frame_sync.py b/model/test_frame_sync.py new file mode 100644 index 0000000..3ec6a8c --- /dev/null +++ b/model/test_frame_sync.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +""" +Tests for the frame synchronization prototype (frame_sync.py). + +Tests cover stream generation (length, offsets) and syndrome screening +(correct offset identification, wrong offset rejection). + +Run: + python3 -m pytest model/test_frame_sync.py -v +""" + +import numpy as np +import pytest + +from ldpc_sim import N, build_full_h_matrix +from frame_sync import generate_stream, syndrome_screen + + +# ============================================================================= +# Fixtures +# ============================================================================= + +@pytest.fixture(scope="module") +def H(): + """Full expanded H matrix, built once per module.""" + return build_full_h_matrix() + + +# ============================================================================= +# TestStreamGeneration +# ============================================================================= + +class TestStreamGeneration: + """Validate continuous LLR stream generation.""" + + def test_stream_length(self, H): + """Stream should be n_frames * N + offset long.""" + np.random.seed(100) + n_frames = 5 + offset = 37 + stream_llr, true_offset, info_list = generate_stream( + H, n_frames=n_frames, lam_s=5.0, lam_b=0.1, offset=offset + ) + assert len(stream_llr) == n_frames * N + offset + assert true_offset == offset + assert len(info_list) == n_frames + + def test_stream_zero_offset(self, H): + """offset=0 should work, producing stream of length n_frames * N.""" + np.random.seed(101) + n_frames = 3 + stream_llr, true_offset, info_list = generate_stream( + H, n_frames=n_frames, lam_s=5.0, lam_b=0.1, offset=0 + ) + assert len(stream_llr) == n_frames * N + assert true_offset == 0 + assert len(info_list) == n_frames + + def test_stream_random_offset(self, H): + """offset=None should pick a random offset in [0, N-1].""" + np.random.seed(102) + stream_llr, true_offset, info_list = generate_stream( + H, n_frames=4, lam_s=5.0, lam_b=0.1, offset=None + ) + assert 0 <= true_offset < N + assert len(stream_llr) == 4 * N + true_offset + + +# ============================================================================= +# TestSyndromeScreen +# ============================================================================= + +class TestSyndromeScreen: + """Validate syndrome-based offset screening.""" + + def test_correct_offset_low_syndrome(self, H): + """At high SNR (lam_s=10), the best offset should match the true offset.""" + np.random.seed(200) + n_frames = 3 + offset = 42 + stream_llr, true_offset, _ = generate_stream( + H, n_frames=n_frames, lam_s=10.0, lam_b=0.1, offset=offset + ) + scores = syndrome_screen(stream_llr, n_offsets=N) + # The true offset should have the lowest (or near-lowest) syndrome weight + best_offset = min(scores, key=scores.get) + assert best_offset == true_offset, ( + f"Best offset {best_offset} (sw={scores[best_offset]}) " + f"!= true offset {true_offset} (sw={scores[true_offset]})" + ) + + def test_wrong_offsets_high_syndrome(self, H): + """Wrong offsets should have average syndrome weight > 80.""" + np.random.seed(201) + n_frames = 3 + offset = 10 + stream_llr, true_offset, _ = generate_stream( + H, n_frames=n_frames, lam_s=10.0, lam_b=0.1, offset=offset + ) + scores = syndrome_screen(stream_llr, n_offsets=N) + # Collect syndrome weights for wrong offsets + wrong_weights = [ + scores[o] for o in scores if o != true_offset + ] + avg_wrong = np.mean(wrong_weights) + assert avg_wrong > 80, ( + f"Average wrong-offset syndrome weight {avg_wrong:.1f} is not > 80" + )