feat: add frame synchronization prototype with tests
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
465
model/frame_sync.py
Normal file
465
model/frame_sync.py
Normal file
@@ -0,0 +1,465 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Frame Synchronization Prototype for LDPC Optical Communication
|
||||
|
||||
Implements syndrome-based frame synchronization for a continuous stream of
|
||||
LDPC-encoded codewords over a photon-counting optical channel. The receiver
|
||||
must find the codeword boundary (offset) in the stream before decoding.
|
||||
|
||||
Strategy:
|
||||
1. Syndrome screening: try all N candidate offsets, compute hard-decision
|
||||
syndrome weight. Valid codeword boundaries produce low syndrome weight;
|
||||
random offsets produce ~M/2 (~112).
|
||||
2. Full decode: for candidates below SCREENING_THRESHOLD, run iterative
|
||||
min-sum decoding. If converged, confirm with consecutive frames.
|
||||
3. Re-sync: after lock, if offset drifts by a small slip, search locally
|
||||
before falling back to full acquisition.
|
||||
|
||||
Usage:
|
||||
python3 frame_sync.py # Quick demo at lam_s=5.0
|
||||
python3 frame_sync.py --lam-s 3.0 # Demo at specific SNR
|
||||
python3 frame_sync.py --sweep # Acquisition sweep over SNR
|
||||
python3 frame_sync.py --resync-test # Re-sync robustness test
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import argparse
|
||||
|
||||
from ldpc_sim import (
|
||||
build_full_h_matrix, ldpc_encode, poisson_channel, quantize_llr,
|
||||
decode_layered_min_sum, compute_syndrome_weight,
|
||||
N, K, M, Z, N_BASE, M_BASE, H_BASE,
|
||||
)
|
||||
|
||||
# Screening threshold: offsets with syndrome weight below this get full decode.
|
||||
# Wrong offsets have syndrome weight ~M/2 = 112; correct offsets at decent SNR
|
||||
# will be much lower. M/4 = 56, using 50 for margin.
|
||||
SCREENING_THRESHOLD = 50
|
||||
|
||||
|
||||
def generate_stream(H, n_frames, lam_s, lam_b, offset):
|
||||
"""
|
||||
Generate a continuous LLR stream of concatenated LDPC codewords.
|
||||
|
||||
Inserts a random (or specified) offset of noise-like LLRs at the start,
|
||||
simulating the receiver not knowing where codeword boundaries are.
|
||||
|
||||
Args:
|
||||
H: full parity-check matrix (M x N)
|
||||
n_frames: number of codewords to concatenate
|
||||
lam_s: signal photons per slot
|
||||
lam_b: background photons per slot
|
||||
offset: number of noise LLRs prepended (None = random 0..N-1)
|
||||
|
||||
Returns:
|
||||
stream_llr: continuous float LLR stream (length = n_frames * N + offset)
|
||||
true_offset: the actual offset used
|
||||
info_list: list of info-bit arrays for each frame
|
||||
"""
|
||||
if offset is None:
|
||||
offset = np.random.randint(0, N)
|
||||
true_offset = offset
|
||||
|
||||
# Noise prefix: random LLRs that do not correspond to any codeword
|
||||
if offset > 0:
|
||||
prefix = np.random.normal(0, 1, offset)
|
||||
else:
|
||||
prefix = np.array([], dtype=np.float64)
|
||||
|
||||
# Encode and modulate each frame
|
||||
frame_llrs = []
|
||||
info_list = []
|
||||
for _ in range(n_frames):
|
||||
info = np.random.randint(0, 2, K).astype(np.int8)
|
||||
codeword = ldpc_encode(info, H)
|
||||
llr_float, _ = poisson_channel(codeword, lam_s, lam_b)
|
||||
frame_llrs.append(llr_float)
|
||||
info_list.append(info)
|
||||
|
||||
stream_llr = np.concatenate([prefix] + frame_llrs)
|
||||
return stream_llr, true_offset, info_list
|
||||
|
||||
|
||||
def syndrome_screen(stream_llr, n_offsets=N):
|
||||
"""
|
||||
Screen candidate offsets by hard-decision syndrome weight.
|
||||
|
||||
For each candidate offset, extract a 256-sample window from the stream,
|
||||
make hard decisions (positive LLR -> 0, negative LLR -> 1), and compute
|
||||
the syndrome weight. Correct offsets yield low syndrome weight; random
|
||||
offsets yield ~M/2 (~112).
|
||||
|
||||
Args:
|
||||
stream_llr: continuous LLR stream (float)
|
||||
n_offsets: number of offsets to try (0..n_offsets-1)
|
||||
|
||||
Returns:
|
||||
dict mapping offset -> syndrome weight
|
||||
"""
|
||||
scores = {}
|
||||
stream_len = len(stream_llr)
|
||||
for off in range(n_offsets):
|
||||
end = off + N
|
||||
if end > stream_len:
|
||||
# Not enough data for a full codeword at this offset
|
||||
scores[off] = M # worst case
|
||||
continue
|
||||
window = stream_llr[off:end]
|
||||
# Hard decision: positive LLR -> bit 0, negative -> bit 1
|
||||
hard = [1 if v < 0 else 0 for v in window]
|
||||
sw = compute_syndrome_weight(hard)
|
||||
scores[off] = sw
|
||||
return scores
|
||||
|
||||
|
||||
def acquire_sync(stream_llr, H=None, max_confirm=2, max_iter=30):
|
||||
"""
|
||||
Acquire frame synchronization from a continuous LLR stream.
|
||||
|
||||
Strategy:
|
||||
1. Syndrome-screen all N candidate offsets
|
||||
2. Sort by syndrome weight (best first)
|
||||
3. For candidates below SCREENING_THRESHOLD: run full iterative decode
|
||||
4. If decode converges: confirm by decoding next max_confirm frames
|
||||
5. Return result with cost metrics
|
||||
|
||||
Args:
|
||||
stream_llr: continuous LLR stream (float)
|
||||
H: parity-check matrix (built if None)
|
||||
max_confirm: number of consecutive frames to confirm after first lock
|
||||
max_iter: max decoder iterations
|
||||
|
||||
Returns:
|
||||
dict with keys:
|
||||
locked: bool - whether sync was acquired
|
||||
offset: int or None - detected offset
|
||||
offsets_screened: int - number of offsets screened
|
||||
full_decodes: int - number of full iterative decodes attempted
|
||||
screening_cost: float - screening cost in equivalent decode units
|
||||
total_equiv_decodes: float - total cost in equivalent decode units
|
||||
"""
|
||||
if H is None:
|
||||
H = build_full_h_matrix()
|
||||
|
||||
stream_len = len(stream_llr)
|
||||
|
||||
# Step 1: Syndrome screen all N offsets
|
||||
scores = syndrome_screen(stream_llr, n_offsets=N)
|
||||
|
||||
# Step 2: Sort candidates by syndrome weight
|
||||
sorted_offsets = sorted(scores.keys(), key=lambda o: scores[o])
|
||||
|
||||
# Screening cost: each screen is ~1/max_iter of a full decode (just hard
|
||||
# decisions + syndrome check, no iterative processing)
|
||||
screening_cost = N / max_iter # in equivalent full decodes
|
||||
|
||||
full_decodes = 0
|
||||
locked = False
|
||||
detected_offset = None
|
||||
|
||||
# Step 3: Try candidates below threshold
|
||||
for off in sorted_offsets:
|
||||
if scores[off] >= SCREENING_THRESHOLD:
|
||||
break # sorted, so all remaining are worse
|
||||
|
||||
# Full iterative decode at this offset
|
||||
end = off + N
|
||||
if end > stream_len:
|
||||
continue
|
||||
|
||||
window_llr = stream_llr[off:end]
|
||||
llr_q = quantize_llr(window_llr)
|
||||
decoded, converged, iters, syn_wt = decode_layered_min_sum(
|
||||
llr_q, max_iter=max_iter
|
||||
)
|
||||
full_decodes += 1
|
||||
|
||||
if not converged:
|
||||
continue
|
||||
|
||||
# Step 4: Confirm with consecutive frames
|
||||
confirmed = True
|
||||
for cf in range(1, max_confirm + 1):
|
||||
cf_start = off + cf * N
|
||||
cf_end = cf_start + N
|
||||
if cf_end > stream_len:
|
||||
confirmed = False
|
||||
break
|
||||
cf_window = stream_llr[cf_start:cf_end]
|
||||
cf_llr_q = quantize_llr(cf_window)
|
||||
cf_decoded, cf_converged, _, _ = decode_layered_min_sum(
|
||||
cf_llr_q, max_iter=max_iter
|
||||
)
|
||||
full_decodes += 1
|
||||
if not cf_converged:
|
||||
confirmed = False
|
||||
break
|
||||
|
||||
if confirmed:
|
||||
locked = True
|
||||
detected_offset = off
|
||||
break
|
||||
|
||||
total_equiv_decodes = screening_cost + full_decodes
|
||||
|
||||
return {
|
||||
'locked': locked,
|
||||
'offset': detected_offset,
|
||||
'offsets_screened': N,
|
||||
'full_decodes': full_decodes,
|
||||
'screening_cost': screening_cost,
|
||||
'total_equiv_decodes': total_equiv_decodes,
|
||||
}
|
||||
|
||||
|
||||
def resync_test(stream_llr, true_offset, slip_amount, H=None,
|
||||
search_radius=16, max_iter=30):
|
||||
"""
|
||||
Simulate offset slip and attempt re-synchronization.
|
||||
|
||||
After lock is established, the offset may drift by slip_amount. First
|
||||
search locally (within +/- search_radius of the slipped offset), then
|
||||
fall back to full acquire_sync if local search fails.
|
||||
|
||||
Args:
|
||||
stream_llr: continuous LLR stream
|
||||
true_offset: the actual correct offset
|
||||
slip_amount: how much the offset has drifted
|
||||
H: parity-check matrix (built if None)
|
||||
search_radius: local search window half-width
|
||||
max_iter: max decoder iterations
|
||||
|
||||
Returns:
|
||||
dict with keys:
|
||||
locked: bool
|
||||
offset: int or None
|
||||
slip: int - the slip amount tested
|
||||
needed_full_search: bool - whether full acquire was needed
|
||||
"""
|
||||
if H is None:
|
||||
H = build_full_h_matrix()
|
||||
|
||||
stream_len = len(stream_llr)
|
||||
slipped_offset = true_offset + slip_amount
|
||||
|
||||
# Local search: try offsets near the slipped position
|
||||
candidates = []
|
||||
for delta in range(-search_radius, search_radius + 1):
|
||||
candidate = slipped_offset + delta
|
||||
if 0 <= candidate and candidate + N <= stream_len:
|
||||
candidates.append(candidate)
|
||||
|
||||
# Screen candidates by syndrome weight
|
||||
best_sw = M + 1
|
||||
best_off = None
|
||||
for off in candidates:
|
||||
window = stream_llr[off:off + N]
|
||||
hard = [1 if v < 0 else 0 for v in window]
|
||||
sw = compute_syndrome_weight(hard)
|
||||
if sw < best_sw:
|
||||
best_sw = sw
|
||||
best_off = off
|
||||
|
||||
# Try full decode on best local candidate
|
||||
if best_off is not None and best_sw < SCREENING_THRESHOLD:
|
||||
window_llr = stream_llr[best_off:best_off + N]
|
||||
llr_q = quantize_llr(window_llr)
|
||||
decoded, converged, _, _ = decode_layered_min_sum(
|
||||
llr_q, max_iter=max_iter
|
||||
)
|
||||
if converged:
|
||||
return {
|
||||
'locked': True,
|
||||
'offset': best_off,
|
||||
'slip': slip_amount,
|
||||
'needed_full_search': False,
|
||||
}
|
||||
|
||||
# Local search failed; fall back to full acquisition
|
||||
result = acquire_sync(stream_llr, H=H, max_iter=max_iter)
|
||||
return {
|
||||
'locked': result['locked'],
|
||||
'offset': result['offset'],
|
||||
'slip': slip_amount,
|
||||
'needed_full_search': True,
|
||||
}
|
||||
|
||||
|
||||
def run_acquisition_sweep(lam_s_values, lam_b=0.1, n_trials=20, n_frames=5):
|
||||
"""
|
||||
Sweep lambda_s values and measure acquisition performance.
|
||||
|
||||
Args:
|
||||
lam_s_values: list of signal photon rates to test
|
||||
lam_b: background photon rate
|
||||
n_trials: trials per SNR point
|
||||
n_frames: frames per stream
|
||||
"""
|
||||
H = build_full_h_matrix()
|
||||
print(f"{'lam_s':>8s} {'lock_rate':>10s} {'false_lock':>11s} "
|
||||
f"{'avg_cost':>10s} {'avg_decodes':>12s}")
|
||||
print("-" * 55)
|
||||
|
||||
for lam_s in lam_s_values:
|
||||
n_locked = 0
|
||||
n_false_lock = 0
|
||||
total_cost = 0.0
|
||||
total_decodes = 0
|
||||
|
||||
for trial in range(n_trials):
|
||||
stream_llr, true_offset, info_list = generate_stream(
|
||||
H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None
|
||||
)
|
||||
result = acquire_sync(stream_llr, H=H)
|
||||
total_cost += result['total_equiv_decodes']
|
||||
total_decodes += result['full_decodes']
|
||||
|
||||
if result['locked']:
|
||||
if result['offset'] == true_offset:
|
||||
n_locked += 1
|
||||
else:
|
||||
n_false_lock += 1
|
||||
|
||||
lock_rate = n_locked / n_trials
|
||||
false_rate = n_false_lock / n_trials
|
||||
avg_cost = total_cost / n_trials
|
||||
avg_decodes = total_decodes / n_trials
|
||||
|
||||
print(f"{lam_s:8.1f} {lock_rate:10.2f} {false_rate:11.2f} "
|
||||
f"{avg_cost:10.1f} {avg_decodes:12.1f}")
|
||||
|
||||
|
||||
def run_resync_sweep(lam_s=5.0, lam_b=0.1, n_trials=10, n_frames=5):
|
||||
"""
|
||||
Test re-synchronization at various slip amounts.
|
||||
|
||||
Args:
|
||||
lam_s: signal photon rate
|
||||
lam_b: background photon rate
|
||||
n_trials: trials per slip amount
|
||||
n_frames: frames per stream
|
||||
"""
|
||||
H = build_full_h_matrix()
|
||||
slip_amounts = [1, 2, 4, 8, 16, 32, 64, 128]
|
||||
|
||||
print(f"Re-sync sweep: lam_s={lam_s}, lam_b={lam_b}, n_trials={n_trials}")
|
||||
print(f"{'slip':>6s} {'lock_rate':>10s} {'correct':>8s} "
|
||||
f"{'full_search':>12s}")
|
||||
print("-" * 40)
|
||||
|
||||
for slip in slip_amounts:
|
||||
n_locked = 0
|
||||
n_correct = 0
|
||||
n_full_search = 0
|
||||
|
||||
for trial in range(n_trials):
|
||||
stream_llr, true_offset, info_list = generate_stream(
|
||||
H, n_frames=n_frames, lam_s=lam_s, lam_b=lam_b, offset=None
|
||||
)
|
||||
result = resync_test(
|
||||
stream_llr, true_offset, slip, H=H
|
||||
)
|
||||
if result['locked']:
|
||||
n_locked += 1
|
||||
if result['offset'] == true_offset:
|
||||
n_correct += 1
|
||||
if result['needed_full_search']:
|
||||
n_full_search += 1
|
||||
|
||||
lock_rate = n_locked / n_trials
|
||||
correct_rate = n_correct / n_trials
|
||||
full_search_rate = n_full_search / n_trials
|
||||
|
||||
print(f"{slip:6d} {lock_rate:10.2f} {correct_rate:8.2f} "
|
||||
f"{full_search_rate:12.2f}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Frame Synchronization Prototype for LDPC Optical Communication'
|
||||
)
|
||||
parser.add_argument('--sweep', action='store_true',
|
||||
help='Run acquisition sweep over SNR')
|
||||
parser.add_argument('--resync-test', action='store_true',
|
||||
help='Run re-sync robustness test')
|
||||
parser.add_argument('--lam-s', type=float, default=5.0,
|
||||
help='Signal photons/slot (default: 5.0)')
|
||||
parser.add_argument('--lam-b', type=float, default=0.1,
|
||||
help='Background photons/slot (default: 0.1)')
|
||||
parser.add_argument('--n-trials', type=int, default=20,
|
||||
help='Number of trials per test point (default: 20)')
|
||||
parser.add_argument('--seed', type=int, default=42,
|
||||
help='Random seed (default: 42)')
|
||||
args = parser.parse_args()
|
||||
|
||||
np.random.seed(args.seed)
|
||||
|
||||
if args.sweep:
|
||||
print("=== Acquisition Sweep ===")
|
||||
lam_s_values = [1.0, 2.0, 3.0, 4.0, 5.0, 7.0, 10.0]
|
||||
run_acquisition_sweep(
|
||||
lam_s_values, lam_b=args.lam_b, n_trials=args.n_trials
|
||||
)
|
||||
|
||||
elif args.resync_test:
|
||||
print("=== Re-Sync Robustness Test ===")
|
||||
run_resync_sweep(
|
||||
lam_s=args.lam_s, lam_b=args.lam_b, n_trials=args.n_trials
|
||||
)
|
||||
|
||||
else:
|
||||
# Quick demo
|
||||
print("=== Frame Sync Demo ===")
|
||||
print(f"Code: ({N},{K}), rate {K/N:.3f}, Z={Z}")
|
||||
print(f"lam_s={args.lam_s}, lam_b={args.lam_b}")
|
||||
print()
|
||||
|
||||
H = build_full_h_matrix()
|
||||
|
||||
# Generate a stream with random offset
|
||||
n_frames = 5
|
||||
stream_llr, true_offset, info_list = generate_stream(
|
||||
H, n_frames=n_frames, lam_s=args.lam_s, lam_b=args.lam_b,
|
||||
offset=None
|
||||
)
|
||||
print(f"Generated stream: {len(stream_llr)} samples, "
|
||||
f"{n_frames} frames, true offset={true_offset}")
|
||||
|
||||
# Syndrome screening
|
||||
print("\nSyndrome screening (all 256 offsets)...")
|
||||
scores = syndrome_screen(stream_llr, n_offsets=N)
|
||||
sorted_scores = sorted(scores.items(), key=lambda x: x[1])
|
||||
print(f" Best 5 offsets:")
|
||||
for off, sw in sorted_scores[:5]:
|
||||
marker = " <-- TRUE" if off == true_offset else ""
|
||||
print(f" offset={off:3d} syndrome_weight={sw:3d}{marker}")
|
||||
wrong_avg = np.mean([
|
||||
sw for off, sw in scores.items() if off != true_offset
|
||||
])
|
||||
print(f" Average wrong-offset syndrome weight: {wrong_avg:.1f}")
|
||||
|
||||
# Full acquisition
|
||||
print("\nRunning full acquisition...")
|
||||
result = acquire_sync(stream_llr, H=H)
|
||||
print(f" Locked: {result['locked']}")
|
||||
print(f" Detected offset: {result['offset']} "
|
||||
f"(true: {true_offset}, "
|
||||
f"{'CORRECT' if result['offset'] == true_offset else 'WRONG'})")
|
||||
print(f" Full decodes: {result['full_decodes']}")
|
||||
print(f" Screening cost: {result['screening_cost']:.1f} equiv decodes")
|
||||
print(f" Total cost: {result['total_equiv_decodes']:.1f} equiv decodes")
|
||||
|
||||
# Re-sync test at a small slip
|
||||
if result['locked']:
|
||||
print("\nRe-sync test (slip=4)...")
|
||||
resync_result = resync_test(
|
||||
stream_llr, true_offset, slip_amount=4, H=H
|
||||
)
|
||||
print(f" Locked: {resync_result['locked']}")
|
||||
print(f" Offset: {resync_result['offset']} "
|
||||
f"(true: {true_offset})")
|
||||
print(f" Needed full search: {resync_result['needed_full_search']}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
108
model/test_frame_sync.py
Normal file
108
model/test_frame_sync.py
Normal file
@@ -0,0 +1,108 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for the frame synchronization prototype (frame_sync.py).
|
||||
|
||||
Tests cover stream generation (length, offsets) and syndrome screening
|
||||
(correct offset identification, wrong offset rejection).
|
||||
|
||||
Run:
|
||||
python3 -m pytest model/test_frame_sync.py -v
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from ldpc_sim import N, build_full_h_matrix
|
||||
from frame_sync import generate_stream, syndrome_screen
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Fixtures
|
||||
# =============================================================================
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def H():
|
||||
"""Full expanded H matrix, built once per module."""
|
||||
return build_full_h_matrix()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# TestStreamGeneration
|
||||
# =============================================================================
|
||||
|
||||
class TestStreamGeneration:
|
||||
"""Validate continuous LLR stream generation."""
|
||||
|
||||
def test_stream_length(self, H):
|
||||
"""Stream should be n_frames * N + offset long."""
|
||||
np.random.seed(100)
|
||||
n_frames = 5
|
||||
offset = 37
|
||||
stream_llr, true_offset, info_list = generate_stream(
|
||||
H, n_frames=n_frames, lam_s=5.0, lam_b=0.1, offset=offset
|
||||
)
|
||||
assert len(stream_llr) == n_frames * N + offset
|
||||
assert true_offset == offset
|
||||
assert len(info_list) == n_frames
|
||||
|
||||
def test_stream_zero_offset(self, H):
|
||||
"""offset=0 should work, producing stream of length n_frames * N."""
|
||||
np.random.seed(101)
|
||||
n_frames = 3
|
||||
stream_llr, true_offset, info_list = generate_stream(
|
||||
H, n_frames=n_frames, lam_s=5.0, lam_b=0.1, offset=0
|
||||
)
|
||||
assert len(stream_llr) == n_frames * N
|
||||
assert true_offset == 0
|
||||
assert len(info_list) == n_frames
|
||||
|
||||
def test_stream_random_offset(self, H):
|
||||
"""offset=None should pick a random offset in [0, N-1]."""
|
||||
np.random.seed(102)
|
||||
stream_llr, true_offset, info_list = generate_stream(
|
||||
H, n_frames=4, lam_s=5.0, lam_b=0.1, offset=None
|
||||
)
|
||||
assert 0 <= true_offset < N
|
||||
assert len(stream_llr) == 4 * N + true_offset
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# TestSyndromeScreen
|
||||
# =============================================================================
|
||||
|
||||
class TestSyndromeScreen:
|
||||
"""Validate syndrome-based offset screening."""
|
||||
|
||||
def test_correct_offset_low_syndrome(self, H):
|
||||
"""At high SNR (lam_s=10), the best offset should match the true offset."""
|
||||
np.random.seed(200)
|
||||
n_frames = 3
|
||||
offset = 42
|
||||
stream_llr, true_offset, _ = generate_stream(
|
||||
H, n_frames=n_frames, lam_s=10.0, lam_b=0.1, offset=offset
|
||||
)
|
||||
scores = syndrome_screen(stream_llr, n_offsets=N)
|
||||
# The true offset should have the lowest (or near-lowest) syndrome weight
|
||||
best_offset = min(scores, key=scores.get)
|
||||
assert best_offset == true_offset, (
|
||||
f"Best offset {best_offset} (sw={scores[best_offset]}) "
|
||||
f"!= true offset {true_offset} (sw={scores[true_offset]})"
|
||||
)
|
||||
|
||||
def test_wrong_offsets_high_syndrome(self, H):
|
||||
"""Wrong offsets should have average syndrome weight > 80."""
|
||||
np.random.seed(201)
|
||||
n_frames = 3
|
||||
offset = 10
|
||||
stream_llr, true_offset, _ = generate_stream(
|
||||
H, n_frames=n_frames, lam_s=10.0, lam_b=0.1, offset=offset
|
||||
)
|
||||
scores = syndrome_screen(stream_llr, n_offsets=N)
|
||||
# Collect syndrome weights for wrong offsets
|
||||
wrong_weights = [
|
||||
scores[o] for o in scores if o != true_offset
|
||||
]
|
||||
avg_wrong = np.mean(wrong_weights)
|
||||
assert avg_wrong > 80, (
|
||||
f"Average wrong-offset syndrome weight {avg_wrong:.1f} is not > 80"
|
||||
)
|
||||
Reference in New Issue
Block a user