Skip to content

Commit 1cdbc00

Browse files
authored
Merge pull request #50 from hyp3rd/feat/distributed-backend
Feat/distributed backend
2 parents 9243487 + af59f5c commit 1cdbc00

16 files changed

Lines changed: 1743 additions & 59 deletions
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
applyTo: '**'
3+
---
4+
5+
# Basic instructions
6+
7+
Always run `golagci-lint` and staticcheck after completing a Golang task.
8+
Verify the quality of the code you provide, including repetitions, flaws, and ways to modernise the approach to ensure consistency. Adopt a development method and stick consistently to it.
9+
Parse the Makefile in the project, and you will find the commands you need to lint, polish, and test the code.
10+
11+
Always document the solutions we find, and where applicable, use the ./docs folder for extensive documentation.
12+
13+
## Toolset
14+
15+
- [Makefile](../../Makefile).
16+
- Always run: `make lint`

README.md

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ It is optimized for performance and flexibility:
1010

1111
- Tunable expiration and eviction intervals (or fully proactive eviction when the eviction interval is set to `0`).
1212
- Debounced & coalesced expiration trigger channel to avoid thrashing.
13-
- Non-blocking manual `TriggerEviction()` signal.
13+
- Non-blocking manual `TriggerEviction(context.Context)` signal.
1414
- Serializer‑aware memory accounting (item size reflects the backend serialization format when available).
1515
- Multiple eviction algorithms with the ability to register custom ones.
1616
- Multiple stats collectors (default histogram) and middleware hooks.
@@ -61,6 +61,8 @@ Endpoints (subject to change):
6161
- GET /config – sanitized runtime config (now includes replication + virtual node settings when using DistMemory)
6262
- GET /dist/metrics – distributed backend forwarding / replication counters (DistMemory only)
6363
- GET /dist/owners?key=K – current ring owners (IDs) for key K (DistMemory only, debug)
64+
- GET /internal/merkle – Merkle tree snapshot (DistMemory experimental anti-entropy)
65+
- GET /internal/keys – Full key enumeration (debug / anti-entropy fallback; expensive)
6466
- GET /cluster/members – membership snapshot (id, address, state, incarnation, replication factor, virtual nodes)
6567
- GET /cluster/ring – ring vnode hashes (debug / diagnostics)
6668
- POST /evict – trigger eviction cycle
@@ -181,8 +183,14 @@ if err != nil {
181183
| `WithManagementHTTP` | Start optional management HTTP server. |
182184
| `WithDistReplication` | (DistMemory) Set replication factor (owners per key). |
183185
| `WithDistVirtualNodes` | (DistMemory) Virtual nodes per physical node for consistent hashing. |
186+
| `WithDistMerkleChunkSize` | (DistMemory) Keys per Merkle leaf chunk (power-of-two recommended). |
187+
| `WithDistMerkleAutoSync` | (DistMemory) Interval for background Merkle sync (<=0 disables). |
188+
| `WithDistMerkleAutoSyncPeers` | (DistMemory) Limit peers synced per auto-sync tick (0=all). |
189+
| `WithDistListKeysCap` | (DistMemory) Cap number of keys fetched via fallback enumeration. |
184190
| `WithDistNode` | (DistMemory) Explicit node identity (id/address). |
185191
| `WithDistSeeds` | (DistMemory) Static seed addresses to pre-populate membership. |
192+
| `WithDistTombstoneTTL` | (DistMemory) Retain delete tombstones for this duration before compaction (<=0 = infinite). |
193+
| `WithDistTombstoneSweep` | (DistMemory) Interval to run tombstone compaction (<=0 disables). |
186194

187195
*ARC is experimental (not registered by default).
188196

@@ -204,10 +212,39 @@ Current capabilities:
204212
- Ownership enforcement (non‑owners forward to primary).
205213
- Replica fan‑out on writes (best‑effort) & replica removals.
206214
- Read‑repair when a local owner misses but another replica has the key.
215+
- Basic delete semantics with tombstones: deletions propagate as versioned tombstones preventing
216+
resurrection during anti-entropy (tombstone retention is in‑memory, no persistence yet).
217+
- Tombstone versioning uses a per-process monotonic counter when no prior item version exists (avoids time-based unsigned casts).
218+
- Remote pull sync will infer a tombstone when a key present locally is absent remotely and no local tomb exists (anti-resurrection guard).
219+
- DebugInject intentionally clears any existing tombstone for that key (test helper / simulating authoritative resurrection with higher version).
220+
- Tombstone TTL + periodic compaction: configure with `WithDistTombstoneTTL` / `WithDistTombstoneSweep`; metrics track active & purged counts.
207221
- Metrics exposed via management endpoints (`/dist/metrics`, `/dist/owners`, `/cluster/members`, `/cluster/ring`).
222+
- Includes Merkle phase timings (fetch/build/diff nanos) and counters for keys pulled during anti-entropy.
223+
- Tombstone metrics: `TombstonesActive`, `TombstonesPurged`.
208224

209225
Planned next steps (roadmap excerpts): network transport abstraction, quorum reads/writes, versioning (vector clocks or lamport), failure detection / node states, rebalancing & anti‑entropy sync.
210226

227+
### Roadmap / PRD Progress Snapshot
228+
229+
| Area | Status |
230+
|------|--------|
231+
| Core in-process sharding | Complete (static ring) |
232+
| Replication fan-out | Implemented (best-effort) |
233+
| Read-repair | Implemented |
234+
| Merkle anti-entropy | Implemented (pull-based) |
235+
| Merkle performance metrics | Implemented (fetch/build/diff nanos) |
236+
| Remote-only key enumeration fallback | Implemented with optional cap (`WithDistListKeysCap`) |
237+
| Delete semantics (tombstones) | Implemented (no compaction yet) |
238+
| Tombstone compaction / TTL | Planned |
239+
| Quorum read/write consistency | Partially scaffolded (consistency levels enum) |
240+
| Failure detection / heartbeat | Experimental heartbeat present |
241+
| Membership changes / dynamic rebalancing | Not yet |
242+
| Network transport (HTTP partial) | Basic HTTP management + fetch merkle/keys; full RPC TBD |
243+
| Tracing spans (distributed ops) | Planned |
244+
| Metrics exposure | Basic + Merkle phase metrics |
245+
| Persistence | Not in scope yet |
246+
| Benchmarks & tests | Extensive unit + benchmark coverage |
247+
211248
Example minimal setup:
212249

213250
```go

cspell.config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ words:
3333
- Fprintln
3434
- freqs
3535
- funlen
36+
- gerr
3637
- gitversion
3738
- GITVERSION
3839
- goccy
@@ -49,18 +50,21 @@ words:
4950
- ints
5051
- ireturn
5152
- Itemm
53+
- keyf
5254
- lamport
5355
- LFUDA
5456
- localmodule
5557
- logrus
5658
- memprofile
59+
- Merkle
5760
- Mgmt
5861
- msgpack
5962
- mvdan
6063
- nestif
6164
- Newf
6265
- nolint
6366
- nonamedreturns
67+
- nosec
6468
- NOVENDOR
6569
- paralleltest
6670
- Pipeliner

pkg/backend/dist_http_server.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func (s *distHTTPServer) start(ctx context.Context, dm *DistMemory) error { //no
4040
s.registerGet(ctx, dm)
4141
s.registerRemove(ctx, dm)
4242
s.registerHealth()
43+
s.registerMerkle(ctx, dm)
4344

4445
return s.listen(ctx)
4546
}
@@ -112,6 +113,35 @@ func (s *distHTTPServer) registerHealth() { //nolint:ireturn
112113
s.app.Get("/health", func(fctx fiber.Ctx) error { return fctx.SendString("ok") })
113114
}
114115

116+
func (s *distHTTPServer) registerMerkle(_ context.Context, dm *DistMemory) { //nolint:ireturn
117+
s.app.Get("/internal/merkle", func(fctx fiber.Ctx) error {
118+
tree := dm.BuildMerkleTree()
119+
120+
return fctx.JSON(fiber.Map{
121+
"root": tree.Root,
122+
"leaf_hashes": tree.LeafHashes,
123+
"chunk_size": tree.ChunkSize,
124+
})
125+
})
126+
127+
// naive keys listing for anti-entropy (testing only). Not efficient for large datasets.
128+
s.app.Get("/internal/keys", func(fctx fiber.Ctx) error {
129+
var keys []string
130+
for _, shard := range dm.shards {
131+
if shard == nil {
132+
continue
133+
}
134+
135+
ch := shard.items.IterBuffered()
136+
for t := range ch {
137+
keys = append(keys, t.Key)
138+
}
139+
}
140+
141+
return fctx.JSON(fiber.Map{"keys": keys})
142+
})
143+
}
144+
115145
func (s *distHTTPServer) listen(ctx context.Context) error { //nolint:ireturn
116146
lc := net.ListenConfig{}
117147

0 commit comments

Comments
 (0)