Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 162 additions & 24 deletions pkg/frost/roast/attempt/evidence_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,36 @@ import (
// of how aggressively a peer (or its network link) misbehaves.
const OverflowQuotaDefault uint = 8

// RejectQuotaDefault is the default per-sender reject event quota.
// Matches categoryQuota.Reject in RFC-21 Layer A. A reject event is
// recorded each time a peer's payload fails the validation gate
// (shouldAcceptNativeFROSTMessage returning false), regardless of
// the specific reason.
const RejectQuotaDefault uint = 8

// ConflictQuotaDefault is the default per-sender conflict event
// quota. Matches categoryQuota.Conflict in RFC-21 Layer A. A
// conflict event is recorded when a peer retransmits a message for
// a sender slot that already holds a byte-different contribution
// (first-write-wins reject).
const ConflictQuotaDefault uint = 4

// EvidenceRecorder collects bounded, per-attempt evidence of receive-
// path anomalies that the ROAST coordinator's exclusion policy may
// later consume.
//
// Phase 2 introduces only the overflow channel; future phases extend
// the interface with separate methods for reject events, first-write-
// wins conflicts, and silent peers.
// The interface tracks three categories of evidence:
// - Overflow: payload arrived but the inbound channel was full.
// - Reject: payload arrived but failed validation
// (shouldAcceptNativeFROSTMessage returning false).
// - Conflict: a peer's later retransmission disagreed with its
// earlier contribution for the same slot (equivocation
// signal).
//
// Silence -- peers in the IncludedSet that produced no snapshot at
// all -- is derived implicitly by the NextAttempt policy from
// (ctx.IncludedSet - bundleSenders) and does not need a recorder
// method.
//
// Implementations must be safe for concurrent calls from multiple
// goroutines, since the receive-callback closure in pkg/frost/signing
Expand All @@ -35,57 +58,120 @@ type EvidenceRecorder interface {
// applies its own quota; callers do not need to suppress at the
// call site.
RecordOverflow(sender group.MemberIndex)
// RecordReject notes that a payload from the named sender failed
// the validation gate (typically shouldAcceptNativeFROSTMessage
// returning false). The reason string is preserved verbatim in
// the snapshot so the coordinator's exclusion policy can later
// route by reason if needed; the recorder applies its own
// per-sender quota regardless of reason.
RecordReject(sender group.MemberIndex, reason string)
// RecordConflict notes that a peer retransmitted a message for
// a sender slot that already holds a byte-different contribution
// (equivocation signal under the first-write-wins assembly
// policy).
RecordConflict(sender group.MemberIndex)
// Snapshot returns a copy of the recorded evidence so far. The
// returned value does not alias internal state; the recorder may
// continue receiving events after Snapshot is called.
Snapshot() Evidence
}

// RejectEntry describes a single per-sender reject event recorded
// during an attempt. The reason captures *why* the validation gate
// rejected the payload; the coordinator's exclusion policy treats
// every distinct reason as equally blamable today, but the field
// is kept structured so future policy refinements can differentiate.
type RejectEntry struct {
Reason string
Count uint
}

// Evidence is the per-attempt snapshot of receive-path anomalies
// captured by an EvidenceRecorder. It is the value the ROAST
// coordinator's NextAttempt policy consumes (in a later RFC-21
// phase) to derive the next attempt's ExcludedSet.
// coordinator's NextAttempt policy consumes to derive the next
// attempt's ExcludedSet.
//
// Maps are nil-safe in callers: an absent key means the category
// did not fire for that sender, count zero.
type Evidence struct {
// Overflows maps each sender to the number of overflow events
// observed for that sender during the attempt, saturated at the
// recorder's overflow quota. A missing key means the sender did
// not overflow at all during the attempt.
// recorder's overflow quota.
Overflows map[group.MemberIndex]uint
// Rejects maps each sender to a per-reason set of reject entries.
// The outer map's key is the sender; the inner slice carries one
// entry per distinct reason, with Count saturated at the
// recorder's reject quota.
Rejects map[group.MemberIndex][]RejectEntry
// Conflicts maps each sender to the number of first-write-wins
// conflict events observed during the attempt, saturated at the
// recorder's conflict quota.
Conflicts map[group.MemberIndex]uint
}

// NewBoundedRecorder returns an EvidenceRecorder with default
// per-sender quotas. The recorder is safe for concurrent use.
//
// Phase 2 wiring uses NoOpRecorder by default at every call site;
// real use of the bounded recorder lands in a later phase behind a
// build tag, when the coordinator state machine arrives.
// per-sender quotas across all three categories. The recorder is
// safe for concurrent use.
func NewBoundedRecorder() EvidenceRecorder {
return NewBoundedRecorderWithQuota(OverflowQuotaDefault)
return NewBoundedRecorderWithQuotas(
OverflowQuotaDefault,
RejectQuotaDefault,
ConflictQuotaDefault,
)
}

// NewBoundedRecorderWithQuota returns a recorder with a custom
// overflow quota. Intended for tests; production callers should use
// NewBoundedRecorder so the per-attempt evidence size is uniform
// across the network.
// overflow quota; reject and conflict quotas use their defaults.
// Preserved as the Phase-2 entry point so existing callers do not
// need to update.
func NewBoundedRecorderWithQuota(overflowQuota uint) EvidenceRecorder {
return NewBoundedRecorderWithQuotas(
overflowQuota,
RejectQuotaDefault,
ConflictQuotaDefault,
)
}

// NewBoundedRecorderWithQuotas returns a recorder with custom
// per-category quotas. Intended for tests; production callers
// should use NewBoundedRecorder so the per-attempt evidence size
// is uniform across the network.
func NewBoundedRecorderWithQuotas(
overflowQuota, rejectQuota, conflictQuota uint,
) EvidenceRecorder {
return &boundedRecorder{
overflowQuota: overflowQuota,
rejectQuota: rejectQuota,
conflictQuota: conflictQuota,
overflows: map[group.MemberIndex]uint{},
rejects: map[group.MemberIndex]map[string]uint{},
conflicts: map[group.MemberIndex]uint{},
}
}

// NoOpRecorder returns a recorder that discards every event and
// reports an empty Evidence on Snapshot. It is the default at every
// Phase 2 call site so the receive loops' observable behaviour stays
// identical to pre-Phase-2 until a later phase wires real recorders.
// reports an empty Evidence on Snapshot. It is the default at
// every receive-loop call site when the ROAST-retry registry is
// not populated, so the receive loops' observable behaviour stays
// identical to pre-Phase-2 until a real recorder is wired.
func NoOpRecorder() EvidenceRecorder {
return noOpRecorder{}
}

type boundedRecorder struct {
mu sync.Mutex
overflowQuota uint
rejectQuota uint
conflictQuota uint
overflows map[group.MemberIndex]uint
// rejects[sender][reason] = count. The two-level map keeps each
// reason bucket bounded by rejectQuota independently so a peer
// cannot saturate one reason to mask another (RFC-21 Layer A:
// "a peer cannot spam overflow events to drown out reject
// evidence or vice-versa"; the same principle applies within
// reject reasons).
rejects map[group.MemberIndex]map[string]uint
conflicts map[group.MemberIndex]uint
}

func (r *boundedRecorder) RecordOverflow(sender group.MemberIndex) {
Expand All @@ -96,20 +182,72 @@ func (r *boundedRecorder) RecordOverflow(sender group.MemberIndex) {
}
}

func (r *boundedRecorder) RecordReject(
sender group.MemberIndex,
reason string,
) {
r.mu.Lock()
defer r.mu.Unlock()
bySender, ok := r.rejects[sender]
if !ok {
bySender = map[string]uint{}
r.rejects[sender] = bySender
}
if bySender[reason] < r.rejectQuota {
bySender[reason]++
}
}

func (r *boundedRecorder) RecordConflict(sender group.MemberIndex) {
r.mu.Lock()
defer r.mu.Unlock()
if r.conflicts[sender] < r.conflictQuota {
r.conflicts[sender]++
}
}

func (r *boundedRecorder) Snapshot() Evidence {
r.mu.Lock()
defer r.mu.Unlock()
out := make(map[group.MemberIndex]uint, len(r.overflows))
overflows := make(map[group.MemberIndex]uint, len(r.overflows))
for sender, count := range r.overflows {
out[sender] = count
overflows[sender] = count
}
rejects := make(
map[group.MemberIndex][]RejectEntry,
len(r.rejects),
)
for sender, reasons := range r.rejects {
entries := make([]RejectEntry, 0, len(reasons))
for reason, count := range reasons {
entries = append(entries, RejectEntry{
Reason: reason,
Count: count,
})
}
rejects[sender] = entries
}
conflicts := make(map[group.MemberIndex]uint, len(r.conflicts))
for sender, count := range r.conflicts {
conflicts[sender] = count
}
return Evidence{
Overflows: overflows,
Rejects: rejects,
Conflicts: conflicts,
}
return Evidence{Overflows: out}
}

type noOpRecorder struct{}

func (noOpRecorder) RecordOverflow(group.MemberIndex) {}
func (noOpRecorder) RecordOverflow(group.MemberIndex) {}
func (noOpRecorder) RecordReject(group.MemberIndex, string) {}
func (noOpRecorder) RecordConflict(group.MemberIndex) {}

func (noOpRecorder) Snapshot() Evidence {
return Evidence{Overflows: map[group.MemberIndex]uint{}}
return Evidence{
Overflows: map[group.MemberIndex]uint{},
Rejects: map[group.MemberIndex][]RejectEntry{},
Conflicts: map[group.MemberIndex]uint{},
}
}
114 changes: 114 additions & 0 deletions pkg/frost/roast/attempt/evidence_recorder_categories_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package attempt

import (
"testing"

"github.com/keep-network/keep-core/pkg/protocol/group"
)

func TestBoundedRecorder_RecordReject_AccumulatesByReason(t *testing.T) {
rec := NewBoundedRecorder()
rec.RecordReject(1, "validation_gate_rejected")
rec.RecordReject(1, "validation_gate_rejected")
rec.RecordReject(1, "some_other_reason")

snap := rec.Snapshot()
entries := snap.Rejects[1]
if len(entries) != 2 {
t.Fatalf("expected 2 reject reasons, got %d", len(entries))
}
counts := map[string]uint{}
for _, e := range entries {
counts[e.Reason] = e.Count
}
if counts["validation_gate_rejected"] != 2 {
t.Fatalf("validation_gate_rejected count: got %d want 2", counts["validation_gate_rejected"])
}
if counts["some_other_reason"] != 1 {
t.Fatalf("some_other_reason count: got %d want 1", counts["some_other_reason"])
}
}

func TestBoundedRecorder_RecordReject_PerReasonQuota(t *testing.T) {
rec := NewBoundedRecorderWithQuotas(8, 3, 4)
for i := 0; i < 10; i++ {
rec.RecordReject(1, "spam")
}
snap := rec.Snapshot()
got := snap.Rejects[1][0].Count
if got != 3 {
t.Fatalf("reject quota not enforced: got %d, want 3", got)
}
}

func TestBoundedRecorder_RecordReject_PerReasonQuotasIndependent(t *testing.T) {
// A peer cannot saturate one reason to mask another -- each
// reason has its own quota counter.
rec := NewBoundedRecorderWithQuotas(8, 2, 4)
for i := 0; i < 10; i++ {
rec.RecordReject(1, "reason-A")
}
rec.RecordReject(1, "reason-B")
snap := rec.Snapshot()
counts := map[string]uint{}
for _, e := range snap.Rejects[1] {
counts[e.Reason] = e.Count
}
if counts["reason-A"] != 2 {
t.Fatalf("reason-A saturated at: got %d want 2", counts["reason-A"])
}
if counts["reason-B"] != 1 {
t.Fatalf("reason-B counted independently: got %d want 1", counts["reason-B"])
}
}

func TestBoundedRecorder_RecordConflict_AccumulatesAndSaturates(t *testing.T) {
rec := NewBoundedRecorderWithQuotas(8, 8, 2)
rec.RecordConflict(7)
rec.RecordConflict(7)
rec.RecordConflict(7)
rec.RecordConflict(7)
snap := rec.Snapshot()
if got := snap.Conflicts[7]; got != 2 {
t.Fatalf("conflict count saturated at quota; got %d want 2", got)
}
}

func TestBoundedRecorder_AllCategoriesPresentInSnapshot(t *testing.T) {
rec := NewBoundedRecorder()
rec.RecordOverflow(1)
rec.RecordReject(2, "validation_gate_rejected")
rec.RecordConflict(3)
snap := rec.Snapshot()
if snap.Overflows[1] == 0 {
t.Fatal("overflow not recorded")
}
if len(snap.Rejects[2]) == 0 {
t.Fatal("reject not recorded")
}
if snap.Conflicts[3] == 0 {
t.Fatal("conflict not recorded")
}
}

func TestNoOpRecorder_AllCategoriesInert(t *testing.T) {
rec := NoOpRecorder()
for i := 0; i < 100; i++ {
rec.RecordOverflow(group.MemberIndex(i % 5))
rec.RecordReject(group.MemberIndex(i%5), "spam")
rec.RecordConflict(group.MemberIndex(i % 5))
}
snap := rec.Snapshot()
if len(snap.Overflows) != 0 || len(snap.Rejects) != 0 || len(snap.Conflicts) != 0 {
t.Fatalf("NoOp recorder must report empty snapshot; got %+v", snap)
}
}

func TestRejectAndConflictQuotaConstants_MatchRFC(t *testing.T) {
if RejectQuotaDefault != 8 {
t.Fatalf("RFC-21 specifies reject quota = 8; constant is %d", RejectQuotaDefault)
}
if ConflictQuotaDefault != 4 {
t.Fatalf("RFC-21 specifies conflict quota = 4; constant is %d", ConflictQuotaDefault)
}
}
Loading
Loading