Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c583513
chore: ignore AI tool artifacts
gengdy1545 May 12, 2026
db0554e
fix: metadata mutation return values and barrier checks
gengdy1545 May 12, 2026
206c6fc
fix: make sqlite main index flushes retryable
gengdy1545 May 12, 2026
9931c83
fix: RowLocation fileId race in PixelsWriteBuffer
gengdy1545 May 13, 2026
179cdd5
fix: publish ingest files after index flush
gengdy1545 May 13, 2026
3267821
fix: restrict metadata getFiles to regular files
gengdy1545 May 13, 2026
8ed122b
fix: start Retina background GC after initialization
gengdy1545 May 13, 2026
9515a36
feat(metadata)!: track cleanup deadlines for retired files
gengdy1545 May 13, 2026
3ebfc67
feat(metadata)!: track cleanup deadlines for retired files
gengdy1545 May 14, 2026
36d957c
fix: retire old files during atomic swap
gengdy1545 May 14, 2026
faff52d
feat: order ingest file publish and centralize through write buffer
gengdy1545 May 22, 2026
e3228b4
feat: optimize Retina visibility replay delete modes
gengdy1545 May 23, 2026
08189d7
fix: propagate replay mode in Retina deletes
gengdy1545 May 23, 2026
e8f05a9
fix: stage Retina primary index updates
gengdy1545 May 24, 2026
c20b625
feat(retina)!: introduce recovery checkpoint subsystem
gengdy1545 May 25, 2026
8c86396
fix: storage gc index updates via localIndexService
gengdy1545 May 27, 2026
f3c571d
feat: use retina heartbeat for recovery readiness gate
gengdy1545 May 27, 2026
747b9d1
feat: add MainIndex range deletion API
gengdy1545 Jun 3, 2026
b15eab8
storage: add append stream API
gengdy1545 Jun 8, 2026
0698998
feat(retina): add recovery lifecycle state machine and durable storag…
gengdy1545 Jun 8, 2026
b38de0a
fix(retina): use Java 8-compatible Set construction in StorageGcWal test
gengdy1545 Jun 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ resources/*.xml
*.o
.vscode
cpp/pixels-retina/third_party/

# AI tools
.codex
.claude/
.cursor/
.continue/
.aider*
.ai/
.notes/
CLAUDE.local.md
AGENTS.md.local
3 changes: 2 additions & 1 deletion cpp/pixels-retina/include/RGVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class RGVisibility : public pixels::RetinaBase<RGVisibility<CAPACITY>> {
const std::vector<uint64_t>* initialBitmap = nullptr);
~RGVisibility() override;

void deleteRGRecord(uint32_t rowId, uint64_t timestamp);
void deleteRGRecord(uint32_t rowId, uint64_t timestamp,
ReplayMode replayMode = ReplayMode::NORMAL);
uint64_t* getRGVisibilityBitmap(uint64_t timestamp);

std::vector<uint64_t> collectRGGarbage(uint64_t timestamp);
Expand Down
4 changes: 2 additions & 2 deletions cpp/pixels-retina/include/RGVisibilityJni.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion cpp/pixels-retina/include/TileVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ inline uint64_t extractTimestamp(uint64_t raw) {
return (raw & 0x0000FFFFFFFFFFFFULL);
}

/**
* Controls how DELETE replay interacts with the compacted base bitmap.
*
* NORMAL is the live append path: the caller provides a current delete
* timestamp and the record is appended to the chain. VERSIONED is used when
* replay may race with READY readers; historical deletes publish a new
* VersionedData with a folded baseBitmap. EXCLUSIVE is used only while recovery
* blocks readers and GC; historical deletes may update baseBitmap in place, but
* concurrent recovery writers still need tile-level synchronization.
*/
enum class ReplayMode : uint8_t {
NORMAL = 0,
VERSIONED = 1,
EXCLUSIVE = 2
};

struct DeleteIndexBlock : public pixels::RetinaBase<DeleteIndexBlock> {
static constexpr size_t BLOCK_CAPACITY = 8;
uint64_t items[BLOCK_CAPACITY] = {0};
Expand Down Expand Up @@ -96,7 +112,7 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
// timestamp defaults to 0; bitmap defaults to all-zeros.
explicit TileVisibility(uint64_t timestamp = 0, const uint64_t* bitmap = nullptr);
~TileVisibility() override;
void deleteTileRecord(uint16_t rowId, uint64_t ts);
void deleteTileRecord(uint16_t rowId, uint64_t ts, ReplayMode replayMode = ReplayMode::NORMAL);
void getTileVisibilityBitmap(uint64_t ts, uint64_t* outBitmap) const;
void collectTileGarbage(uint64_t ts, uint64_t* gcSnapshotBitmap);
void exportChainItemsAfter(uint32_t tileId, uint64_t safeGcTs,
Expand All @@ -109,6 +125,14 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {

void reclaimRetiredVersions();

void appendDeleteChain(uint16_t rowId, uint64_t ts);

// VERSIONED: replay with possible readers; historical deletes use COW fold.
void deleteTileRecordVersioned(uint16_t rowId, uint64_t ts);

// EXCLUSIVE: recovery replay without readers; historical deletes fold in place.
void deleteTileRecordExclusive(uint16_t rowId, uint64_t ts);

std::atomic<VersionedData<CAPACITY>*> currentVersion;
std::atomic<DeleteIndexBlock *> tail;
std::atomic<size_t> tailUsed;
Expand Down
5 changes: 3 additions & 2 deletions cpp/pixels-retina/lib/RGVisibility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ TileVisibility<CAPACITY>* RGVisibility<CAPACITY>::getTileVisibility(uint32_t row
}

template<size_t CAPACITY>
void RGVisibility<CAPACITY>::deleteRGRecord(uint32_t rowId, uint64_t timestamp) {
void RGVisibility<CAPACITY>::deleteRGRecord(uint32_t rowId, uint64_t timestamp,
ReplayMode replayMode) {
TileVisibility<CAPACITY>* tileVisibility = getTileVisibility(rowId);
tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp);
tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp, replayMode);
}

template<size_t CAPACITY>
Expand Down
17 changes: 14 additions & 3 deletions cpp/pixels-retina/lib/RGVisibilityJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@
#include "RGVisibility.h"
#include <stdexcept>

namespace {
ReplayMode toReplayMode(jint mode) {
switch (mode) {
case 0: return ReplayMode::NORMAL;
case 1: return ReplayMode::VERSIONED;
case 2: return ReplayMode::EXCLUSIVE;
default: throw std::invalid_argument("unknown ReplayMode");
}
}
}

/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: createNativeObject
Expand Down Expand Up @@ -72,13 +83,13 @@ JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_destroyNative
/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: deleteRecord
* Signature: (JJJ)V
* Signature: (IJJI)V
*/
JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_deleteRecord
(JNIEnv* env, jobject, jint rowId, jlong timestamp, jlong handle) {
(JNIEnv* env, jobject, jint rowId, jlong timestamp, jlong handle, jint replayMode) {
try {
auto* rgVisibility = reinterpret_cast<RGVisibilityInstance*>(handle);
rgVisibility->deleteRGRecord(rowId, timestamp);
rgVisibility->deleteRGRecord(rowId, timestamp, toReplayMode(replayMode));
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
}
Expand Down
66 changes: 65 additions & 1 deletion cpp/pixels-retina/lib/TileVisibility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,71 @@ TileVisibility<CAPACITY>::~TileVisibility() {
}

template<size_t CAPACITY>
void TileVisibility<CAPACITY>::deleteTileRecord(uint16_t rowId, uint64_t ts) {
void TileVisibility<CAPACITY>::deleteTileRecord(uint16_t rowId, uint64_t ts,
ReplayMode replayMode) {
switch (replayMode) {
case ReplayMode::NORMAL:
appendDeleteChain(rowId, ts);
return;
case ReplayMode::VERSIONED:
deleteTileRecordVersioned(rowId, ts);
return;
case ReplayMode::EXCLUSIVE:
deleteTileRecordExclusive(rowId, ts);
return;
default:
throw std::invalid_argument("unknown ReplayMode");
}
}

template<size_t CAPACITY>
void TileVisibility<CAPACITY>::deleteTileRecordVersioned(uint16_t rowId, uint64_t ts) {
// READY backlog replay can race with getTileVisibilityBitmap readers. Fold
// historical deletes by publishing a new VersionedData instead of mutating
// baseBitmap observed by an existing reader.
// Keep ts=0 out of this path because item=0 is the chain-slot sentinel.
while (ts > 0) {
VersionedData<CAPACITY>* cur = currentVersion.load(std::memory_order_acquire);
if (ts > cur->baseTimestamp) {
break;
}
if ((cur->baseBitmap[rowId / 64] & (1ULL << (rowId % 64))) != 0) {
return;
}
uint64_t newBaseBitmap[NUM_WORDS];
std::memcpy(newBaseBitmap, cur->baseBitmap, NUM_WORDS * sizeof(uint64_t));
SET_BITMAP_BIT(newBaseBitmap, rowId);
VersionedData<CAPACITY>* newVer =
new VersionedData<CAPACITY>(cur->baseTimestamp, newBaseBitmap, cur->head);
if (currentVersion.compare_exchange_strong(cur, newVer, std::memory_order_acq_rel)) {
pendingRetire.store(cur, std::memory_order_release);
return;
}
delete newVer;
}

appendDeleteChain(rowId, ts);
}

template<size_t CAPACITY>
void TileVisibility<CAPACITY>::deleteTileRecordExclusive(uint16_t rowId, uint64_t ts) {
// RECOVERING replay blocks readers and GC, so historical deletes can fold
// into baseBitmap in place. Atomic OR prevents lost updates when concurrent
// recovery writers touch the same bitmap word.
VersionedData<CAPACITY>* cur = currentVersion.load(std::memory_order_acquire);
if (ts > 0 && ts <= cur->baseTimestamp) {
uint64_t mask = 1ULL << (rowId % 64);
__atomic_fetch_or(&cur->baseBitmap[rowId / 64], mask, __ATOMIC_RELAXED);
return;
}

appendDeleteChain(rowId, ts);
}

template<size_t CAPACITY>
void TileVisibility<CAPACITY>::appendDeleteChain(uint16_t rowId, uint64_t ts) {
// Normal live apply assumes a current timestamp and records the delete in
// the append-only chain, leaving baseBitmap untouched for the hot path.
uint64_t item = makeDeleteIndex(rowId, ts);
while (true) {
DeleteIndexBlock *curTail = tail.load(std::memory_order_acquire);
Expand Down
72 changes: 72 additions & 0 deletions cpp/pixels-retina/test/RGVisibilityTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,50 @@ class RGVisibilityTest : public ::testing::Test {
RGVisibilityInstance* rgVisibility;
};

static bool rgBitSet(const uint64_t* bitmap, uint32_t rowId) {
return ((bitmap[rowId / 64] >> (rowId % 64)) & 1ULL) != 0;
}

static void runConcurrentRGDeletes(RGVisibilityInstance* visibility,
ReplayMode mode,
uint64_t ts,
int rowCount = 64,
int threadCount = 8) {
ASSERT_EQ(rowCount % threadCount, 0);
std::atomic<bool> start{false};
std::vector<std::thread> threads;
int rowsPerThread = rowCount / threadCount;

for (int t = 0; t < threadCount; t++) {
threads.emplace_back([&, t]() {
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
for (int i = 0; i < rowsPerThread; i++) {
uint32_t rowId = static_cast<uint32_t>(t * rowsPerThread + i);
visibility->deleteRGRecord(rowId, ts, mode);
}
});
}

start.store(true, std::memory_order_release);
for (auto& thread : threads) {
thread.join();
}
}

static void expectRGRows(RGVisibilityInstance* visibility,
uint64_t queryTs,
int rowCount,
bool expectedSet) {
uint64_t* bitmap = visibility->getRGVisibilityBitmap(queryTs);
for (int row = 0; row < rowCount; row++) {
EXPECT_EQ(expectedSet, rgBitSet(bitmap, static_cast<uint32_t>(row)))
<< "row=" << row << " queryTs=" << queryTs;
}
delete[] bitmap;
}

TEST_F(RGVisibilityTest, BasicDeleteAndVisibility) {
uint64_t timestamp1 = 100;
uint64_t timestamp2 = 200;
Expand All @@ -67,6 +111,34 @@ TEST_F(RGVisibilityTest, BasicDeleteAndVisibility) {
delete[] bitmap2;
}

TEST_F(RGVisibilityTest, ConcurrentNormalModeAppendsDeleteChain) {
constexpr uint64_t baseTs = 100;
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);

runConcurrentRGDeletes(&visibility, ReplayMode::NORMAL, baseTs + 1);

expectRGRows(&visibility, baseTs, 64, false);
expectRGRows(&visibility, baseTs + 1, 64, true);
}

TEST_F(RGVisibilityTest, ConcurrentVersionedModeFoldsWithCow) {
constexpr uint64_t baseTs = 100;
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);

runConcurrentRGDeletes(&visibility, ReplayMode::VERSIONED, baseTs - 1);

expectRGRows(&visibility, baseTs, 64, true);
}

TEST_F(RGVisibilityTest, ConcurrentExclusiveModeFoldsWithAtomicOr) {
constexpr uint64_t baseTs = 100;
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);

runConcurrentRGDeletes(&visibility, ReplayMode::EXCLUSIVE, baseTs - 1);

expectRGRows(&visibility, baseTs, 64, true);
}

TEST_F(RGVisibilityTest, MultiThread) {
struct DeleteRecord {
uint64_t timestamp;
Expand Down
Loading
Loading