Skip to content

Commit 2230e38

Browse files
DHT: Store only on ACTIVE supernodes
1 parent 8c2666d commit 2230e38

3 files changed

Lines changed: 196 additions & 42 deletions

File tree

p2p/kademlia/bootstrap.go

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import (
1616
)
1717

1818
const (
19-
bootstrapRefreshInterval = 10 * time.Minute
20-
defaultSuperNodeP2PPort int = 4445
19+
bootstrapRefreshInterval = 10 * time.Minute
20+
defaultSuperNodeP2PPort int = 4445
21+
superNodeStateActive int32 = 1
22+
superNodeStatePostponed int32 = 5
2123
)
2224

2325
// seed a couple of obviously bad addrs (unless in integration tests)
@@ -108,15 +110,20 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes
108110
return nil
109111
}
110112

111-
// loadBootstrapCandidatesFromChain returns active supernodes (by latest state)
112-
// mapped by "ip:port". No pings here.
113-
func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, error) {
113+
// loadBootstrapCandidatesFromChain returns routing candidates (by latest state) mapped by "ip:port",
114+
// plus two allowlists:
115+
// - routingIDs: Active + Postponed
116+
// - storeIDs: Active only
117+
//
118+
// No pings here.
119+
func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, map[[32]byte]struct{}, error) {
114120
resp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx)
115121
if err != nil {
116-
return nil, nil, fmt.Errorf("failed to list supernodes: %w", err)
122+
return nil, nil, nil, fmt.Errorf("failed to list supernodes: %w", err)
117123
}
118124

119-
activeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes))
125+
routingIDs := make(map[[32]byte]struct{}, len(resp.Supernodes))
126+
storeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes))
120127
mapNodes := make(map[string]*Node, len(resp.Supernodes))
121128
for _, sn := range resp.Supernodes {
122129
if len(sn.States) == 0 {
@@ -130,7 +137,9 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress
130137
latestState = int32(st.State)
131138
}
132139
}
133-
if latestState != 1 { // SuperNodeStateActive = 1
140+
// Routing/read eligibility includes Active + Postponed.
141+
// Store/write eligibility remains Active-only.
142+
if latestState != superNodeStateActive && latestState != superNodeStatePostponed {
134143
continue
135144
}
136145

@@ -148,7 +157,10 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress
148157
} else if len(h) == 32 {
149158
var key [32]byte
150159
copy(key[:], h)
151-
activeIDs[key] = struct{}{}
160+
routingIDs[key] = struct{}{}
161+
if latestState == superNodeStateActive {
162+
storeIDs[key] = struct{}{}
163+
}
152164
}
153165

154166
// latest IP by height
@@ -190,7 +202,7 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress
190202
node.ID = []byte(id)
191203
mapNodes[full] = node
192204
}
193-
return mapNodes, activeIDs, nil
205+
return mapNodes, routingIDs, storeIDs, nil
194206
}
195207

196208
// upsertBootstrapNode inserts/updates replication_info for the discovered node (Active=false).
@@ -245,6 +257,24 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro
245257
if err := s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes); err != nil {
246258
return err
247259
}
260+
allow := make(map[[32]byte]struct{}, len(s.options.BootstrapNodes))
261+
for _, n := range s.options.BootstrapNodes {
262+
if n == nil || len(n.ID) == 0 {
263+
continue
264+
}
265+
h, err := utils.Blake3Hash(n.ID)
266+
if err != nil || len(h) != 32 {
267+
continue
268+
}
269+
var key [32]byte
270+
copy(key[:], h)
271+
allow[key] = struct{}{}
272+
}
273+
// Config bootstrap has no chain states; treat provided peers as both routing/store-eligible.
274+
s.setRoutingAllowlist(ctx, allow)
275+
s.setStoreAllowlist(ctx, allow)
276+
s.pruneIneligibleRoutingPeers(ctx)
277+
248278
for _, n := range s.options.BootstrapNodes {
249279
if err := s.upsertBootstrapNode(ctx, n); err != nil {
250280
logtrace.Warn(ctx, "bootstrap upsert failed", logtrace.Fields{
@@ -265,14 +295,16 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro
265295
}
266296
selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port)
267297

268-
cands, activeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress)
298+
cands, routingIDs, storeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress)
269299
if err != nil {
270300
return err
271301
}
272302

273-
// Update eligibility gate from chain Active state and prune any peers that slipped in via
303+
// Update routing/read gate from chain state and prune any peers that slipped in via
274304
// inbound traffic before the last bootstrap refresh.
275-
s.setRoutingAllowlist(ctx, activeIDs)
305+
s.setRoutingAllowlist(ctx, routingIDs)
306+
// Write/replication targets are Active-only.
307+
s.setStoreAllowlist(ctx, storeIDs)
276308
s.pruneIneligibleRoutingPeers(ctx)
277309

278310
// Upsert candidates to replication_info
@@ -303,13 +335,6 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro
303335
// This keeps replication_info and routing table current as the validator set changes.
304336
func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string) {
305337
go func() {
306-
// Initial sync
307-
if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil {
308-
logtrace.Warn(ctx, "initial bootstrap sync failed", logtrace.Fields{
309-
logtrace.FieldModule: "p2p",
310-
logtrace.FieldError: err.Error(),
311-
})
312-
}
313338
t := time.NewTicker(bootstrapRefreshInterval)
314339
defer t.Stop()
315340

@@ -331,11 +356,15 @@ func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string
331356

332357
// ConfigureBootstrapNodes wires to the new sync/refresher (no pings here).
333358
func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error {
334-
// One-time sync; start refresher in the background
359+
// One-time sync attempt; keep service running if it fails and rely on refresher retries.
335360
if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil {
336-
return err
361+
logtrace.Warn(ctx, "initial bootstrap sync failed; continuing with periodic refresher", logtrace.Fields{
362+
logtrace.FieldModule: "p2p",
363+
logtrace.FieldError: err.Error(),
364+
})
337365
}
338366

367+
// Always start periodic retries so transient chain/API outages can recover.
339368
s.StartBootstrapRefresher(ctx, bootstrapNodes)
340369

341370
return nil

p2p/kademlia/dht.go

Lines changed: 127 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,23 @@ type DHT struct {
6969
metrics DHTMetrics
7070

7171
// routingAllowlist is a fast in-memory gate of which peers are eligible to
72-
// participate in the routing table (based on chain state: Active only).
72+
// participate in routing/read lookup paths (based on chain state).
73+
// Current policy: Active + Postponed are routing-eligible.
7374
//
7475
// Hot paths do only an atomic check + map lookup; updates happen on the
7576
// bootstrap refresh cadence.
7677
routingAllowMu sync.RWMutex
7778
routingAllow map[[32]byte]struct{} // blake3(peerID) -> exists
7879
routingAllowReady atomic.Bool
7980
routingAllowCount atomic.Int64
81+
82+
// storeAllowlist is a fast in-memory gate of which peers are eligible for
83+
// write/replication targets.
84+
// Current policy: Active only.
85+
storeAllowMu sync.RWMutex
86+
storeAllow map[[32]byte]struct{} // blake3(peerID) -> exists
87+
storeAllowReady atomic.Bool
88+
storeAllowCount atomic.Int64
8089
}
8190

8291
// bootstrapIgnoreList seeds the in-memory ignore list with nodes that are
@@ -144,11 +153,11 @@ func (s *DHT) setRoutingAllowlist(ctx context.Context, allow map[[32]byte]struct
144153
// Avoid accidentally locking ourselves out due to transient chain issues.
145154
if len(allow) == 0 {
146155
if !s.routingAllowReady.Load() {
147-
logtrace.Debug(ctx, "routing allowlist from chain is empty; leaving gating disabled (bootstrap)", logtrace.Fields{
156+
logtrace.Debug(ctx, "routing allowlist from chain is empty; leaving routing gating disabled (bootstrap)", logtrace.Fields{
148157
logtrace.FieldModule: "p2p",
149158
})
150159
} else {
151-
logtrace.Warn(ctx, "routing allowlist update skipped: chain returned zero active supernodes; retaining previous allowlist", logtrace.Fields{
160+
logtrace.Warn(ctx, "routing allowlist update skipped: chain returned zero routing-eligible supernodes; retaining previous allowlist", logtrace.Fields{
152161
logtrace.FieldModule: "p2p",
153162
})
154163
}
@@ -164,7 +173,29 @@ func (s *DHT) setRoutingAllowlist(ctx context.Context, allow map[[32]byte]struct
164173

165174
logtrace.Debug(ctx, "routing allowlist updated", logtrace.Fields{
166175
logtrace.FieldModule: "p2p",
167-
"active_peers": len(allow),
176+
"routing_peers": len(allow),
177+
})
178+
}
179+
180+
func (s *DHT) setStoreAllowlist(ctx context.Context, allow map[[32]byte]struct{}) {
181+
if s == nil {
182+
return
183+
}
184+
// Integration tests may use synthetic bootstrap sets; do not enforce chain-state gating.
185+
if integrationTestEnabled() {
186+
return
187+
}
188+
189+
s.storeAllowMu.Lock()
190+
s.storeAllow = allow
191+
s.storeAllowMu.Unlock()
192+
193+
s.storeAllowCount.Store(int64(len(allow)))
194+
s.storeAllowReady.Store(true)
195+
196+
logtrace.Debug(ctx, "store allowlist updated", logtrace.Fields{
197+
logtrace.FieldModule: "p2p",
198+
"store_peers": len(allow),
168199
})
169200
}
170201

@@ -176,9 +207,9 @@ func (s *DHT) eligibleForRouting(n *Node) bool {
176207
if integrationTestEnabled() {
177208
return true
178209
}
179-
// If allowlist isn't ready (or was never populated), do not gate to avoid blocking bootstrap.
210+
// Strict gating: only explicitly allowlisted peers can participate in read/routing.
180211
if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 {
181-
return true
212+
return false
182213
}
183214
if n == nil || len(n.ID) == 0 {
184215
return false
@@ -197,14 +228,51 @@ func (s *DHT) eligibleForRouting(n *Node) bool {
197228
return ok
198229
}
199230

231+
func (s *DHT) eligibleForStore(n *Node) bool {
232+
if s == nil {
233+
return false
234+
}
235+
// In integration tests allow everything; chain state gating is not stable/available there.
236+
if integrationTestEnabled() {
237+
return true
238+
}
239+
// If the store allowlist isn't ready yet, avoid blocking writes during bootstrap.
240+
if !s.storeAllowReady.Load() {
241+
return true
242+
}
243+
// Once initialized, an empty active set means no write-eligible peers.
244+
if s.storeAllowCount.Load() == 0 {
245+
return false
246+
}
247+
if n == nil || len(n.ID) == 0 {
248+
return false
249+
}
250+
251+
n.SetHashedID()
252+
if len(n.HashedID) != 32 {
253+
return false
254+
}
255+
var key [32]byte
256+
copy(key[:], n.HashedID)
257+
258+
s.storeAllowMu.RLock()
259+
_, ok := s.storeAllow[key]
260+
s.storeAllowMu.RUnlock()
261+
return ok
262+
}
263+
200264
func (s *DHT) filterEligibleNodes(nodes []*Node) []*Node {
201265
if s == nil || len(nodes) == 0 {
202266
return nodes
203267
}
204-
// Fast path: not enforcing (integration tests / not ready / empty list)
205-
if integrationTestEnabled() || !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 {
268+
// Fast path for integration tests only.
269+
if integrationTestEnabled() {
206270
return nodes
207271
}
272+
// Strict gating: without a routing allowlist there are no eligible routing peers.
273+
if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 {
274+
return nil
275+
}
208276

209277
out := nodes[:0]
210278
for _, n := range nodes {
@@ -2105,6 +2173,9 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte,
21052173
launched := 0
21062174
for i := 0; i < Alpha && i < nl.Len(); i++ {
21072175
n := nl.Nodes[i]
2176+
if !s.eligibleForStore(n) {
2177+
continue
2178+
}
21082179
if s.ignorelist.Banned(n) {
21092180
continue
21102181
}
@@ -2146,6 +2217,9 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte,
21462217
finalStoreCount := atomic.LoadInt32(&storeCount)
21472218
for i := Alpha; i < nl.Len() && finalStoreCount < int32(Alpha); i++ {
21482219
n := nl.Nodes[i]
2220+
if !s.eligibleForStore(n) {
2221+
continue
2222+
}
21492223
if s.ignorelist.Banned(n) {
21502224
logtrace.Debug(ctx, "Ignore banned node during sequential store", logtrace.Fields{
21512225
logtrace.FieldModule: "p2p",
@@ -2282,11 +2356,17 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
22822356
globalClosestContacts := make(map[string]*NodeList)
22832357
knownNodes := make(map[string]*Node)
22842358
hashes := make([][]byte, len(values))
2359+
routingNodeCount := len(s.ht.nodes())
2360+
candidateLimit := routingNodeCount
2361+
if candidateLimit < Alpha {
2362+
candidateLimit = Alpha
2363+
}
22852364
ignoreList := s.ignorelist.ToNodeList()
22862365
ignoredSet := hashedIDSetFromNodes(ignoreList)
2366+
keysWithoutCandidates := 0
22872367

22882368
{
2289-
f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": len(s.ht.nodes()), logtrace.FieldRole: "client"}
2369+
f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": routingNodeCount, logtrace.FieldRole: "client"}
22902370
if o := logtrace.OriginFromContext(ctx); o != "" {
22912371
f[logtrace.FieldOrigin] = o
22922372
}
@@ -2295,11 +2375,39 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
22952375
for i := 0; i < len(values); i++ {
22962376
target, _ := utils.Blake3Hash(values[i])
22972377
hashes[i] = target
2298-
top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, target, ignoredSet, nil)
2378+
candidates := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(candidateLimit, target, ignoredSet, nil)
22992379

2300-
globalClosestContacts[base58.Encode(target)] = top6
2301-
// log.WithContext(ctx).WithField("top 6", top6).Info("iterate batch store begin")
2302-
s.addKnownNodes(ctx, top6.Nodes, knownNodes)
2380+
writeEligible := make([]*Node, 0, Alpha)
2381+
for _, n := range candidates.Nodes {
2382+
if s.eligibleForStore(n) {
2383+
writeEligible = append(writeEligible, n)
2384+
if len(writeEligible) >= Alpha {
2385+
break
2386+
}
2387+
}
2388+
}
2389+
if len(writeEligible) == 0 {
2390+
keysWithoutCandidates++
2391+
}
2392+
globalClosestContacts[base58.Encode(target)] = &NodeList{Nodes: writeEligible}
2393+
// log.WithContext(ctx).WithField("top 6", candidates).Info("iterate batch store begin")
2394+
s.addKnownNodes(ctx, writeEligible, knownNodes)
2395+
}
2396+
2397+
if keysWithoutCandidates > 0 {
2398+
logtrace.Error(ctx, "dht: batch store skipped (keys without eligible store nodes)", logtrace.Fields{
2399+
logtrace.FieldModule: "dht",
2400+
"task_id": id,
2401+
"keys": len(values),
2402+
"keys_without_nodes": keysWithoutCandidates,
2403+
"len_nodes": routingNodeCount,
2404+
"banned_nodes": len(ignoreList),
2405+
"routing_allow_ready": s.routingAllowReady.Load(),
2406+
"routing_allow_count": s.routingAllowCount.Load(),
2407+
"store_allow_ready": s.storeAllowReady.Load(),
2408+
"store_allow_count": s.storeAllowCount.Load(),
2409+
})
2410+
return fmt.Errorf("no eligible store peers for %d/%d keys", keysWithoutCandidates, len(values))
23032411
}
23042412

23052413
storageMap := make(map[string][]int) // This will store the index of the data in the values array that needs to be stored to the node
@@ -2325,10 +2433,12 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
23252433
logtrace.FieldModule: "dht",
23262434
"task_id": id,
23272435
"keys": len(values),
2328-
"len_nodes": len(s.ht.nodes()),
2436+
"len_nodes": routingNodeCount,
23292437
"banned_nodes": len(ignoreList),
23302438
"routing_allow_ready": s.routingAllowReady.Load(),
23312439
"routing_allow_count": s.routingAllowCount.Load(),
2440+
"store_allow_ready": s.storeAllowReady.Load(),
2441+
"store_allow_count": s.storeAllowCount.Load(),
23322442
})
23332443
return fmt.Errorf("no candidate nodes for batch store")
23342444
}
@@ -2414,6 +2524,9 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[
24142524

24152525
for key, node := range nodes {
24162526
logtrace.Debug(ctx, "Preparing batch store to node", logtrace.Fields{logtrace.FieldModule: "dht", "node": node.String()})
2527+
if !s.eligibleForStore(node) {
2528+
continue
2529+
}
24172530
if s.ignorelist.Banned(node) {
24182531
logtrace.Debug(ctx, "Ignoring banned node in batch store network call", logtrace.Fields{
24192532
logtrace.FieldModule: "dht",

0 commit comments

Comments
 (0)