Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4a8a402
RATIS-2403. Support leader batch write to improve linearizable follow…
ivandika3 Feb 26, 2026
534240e
Move start daemon from constructor to start method to prevent race
ivandika3 Feb 26, 2026
5898f39
Try to adapt test to the new REPLIED_INDEX guarantee.
ivandika3 Feb 27, 2026
7f58060
Revert AtomicReference since it's not atomic and use synchronized lis…
ivandika3 Feb 27, 2026
36f8903
Introduce ReplyFlusher and update log
ivandika3 Mar 1, 2026
bfc5182
Remove unnecessary blank line
ivandika3 Mar 1, 2026
589117b
Fix findbugs
ivandika3 Mar 1, 2026
f1a2bae
Use appliedIndex during flush instead
ivandika3 Mar 2, 2026
e595eab
Fix replyFlusher never run issue
ivandika3 Mar 2, 2026
60380ee
Update documentation
ivandika3 Mar 2, 2026
5128716
Try to use read index as effective commit index
ivandika3 Mar 12, 2026
19321e3
Revert "Use appliedIndex during flush instead"
ivandika3 Mar 12, 2026
511e7df
Retain the original test
ivandika3 Mar 12, 2026
302b53e
Remove unnecessary parameter
ivandika3 Mar 12, 2026
c69b2b5
Assert read index type sync with the config
ivandika3 Mar 23, 2026
fb11a13
Address comments
ivandika3 Mar 27, 2026
57a7a64
Remove unused imports and fix tests
ivandika3 Mar 27, 2026
17d20ec
Update based on patch diff
ivandika3 Mar 31, 2026
676c52b
Add latch assertions to prevent test NPE
ivandika3 Mar 31, 2026
cea0549
Update repliedIndex to startupLogEntry if needed
ivandika3 Mar 31, 2026
2cf81b8
Update replyFlusher first to not blocked by notifyLeaderReady impleme…
ivandika3 Mar 31, 2026
85dd858
Initialize startupLogEntry once at ReplyFlusher start
ivandika3 Mar 31, 2026
bcec68e
Add some assertion logs
ivandika3 Apr 1, 2026
e9fe28b
Disable dummy request to not cause follower client to failover to leader
ivandika3 Apr 1, 2026
ece8ab6
Fix NoSuchElementException when dummy request is disabled
ivandika3 Apr 1, 2026
632e455
Implement Reply toString
ivandika3 Apr 1, 2026
78db698
Add logic to warm up clients to send first request
ivandika3 Apr 1, 2026
d48bcc9
Fix test
ivandika3 Apr 1, 2026
a078432
Update comments
ivandika3 Apr 1, 2026
45c1903
Merge branch 'master' into RATIS-2403
ivandika3 Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private void sendRequestWithRetry(PendingOrderedRequest pending) {
return;
}

if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
if (getSlidingWindow(request).isFirst(pending.getSeqNum())) {
pending.setFirstRequest();
}
LOG.debug("{}: send* {}", client.getId(), request);
Expand Down
37 changes: 32 additions & 5 deletions ratis-docs/src/site/markdown/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,38 @@ if it fails to receive any RPC responses from this peer within this specified ti

### Read Index - Configurations related to ReadIndex used in linearizable read

| **Property** | `raft.server.read.read-index.applied-index.enabled` |
|:----------------|:----------------------------------------------------------------------|
| **Description** | whether applied index (instead of commit index) is used for ReadIndex |
| **Type** | boolean |
| **Default** | false |
| **Property** | `raft.server.read.read-index.type` |
|:----------------|:-----------------------------------------------------------------------------|
| **Description** | type of read index returned |
| **Type** | enum `Read.ReadIndex.Type` [`COMMIT_INDEX`, `APPLIED_INDEX`, `REPLIED_INDEX` |
| **Default** | `Read.ReadIndex.Type.COMMIT_INDEX` |

* `Read.ReadIndex.Type.COMMIT_INDEX` - Use leader's CommitIndex (see Raft Paper section 6.4)
* The safest type as it is specified in the Raft dissertation
* This ReadIndex type can be chosen if the base linearizable read from followers performance already meets expectations.

* `Read.ReadIndex.Type.APPLIED_INDEX` - Use leader's AppliedIndex
* Allow leader to return AppliedIndex (instead of CommitIndex) as the ReadIndex
* This reduces the time follower applying logs up to ReadIndex since AppliedIndex ≤ CommitIndex
* This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` read latency is too high.

* `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex
* RepliedIndex is defined as the last AppliedIndex of the leader when returning the last batch.
* Leader delays replying write requests and only reply them every write batch boundary configurable by `raft.server.read.read-index.replied-index.batch-interval`.
* This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives.
* This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice.
* There is a trade-off in increased write latency (up to one `raft.server.read.read-index.replied-index.batch-interval`) per write.
* RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied request.
* If the RepliedIndex is set to 0, the behavior is identical to `Read.ReadIndex.Type.APPLIED_INDEX`

Note that theoretically all the ReadIndex types still guarantee linearizability,
but there are tradeoffs (e.g. Write and Read performance) between different types.

| **Property** | `raft.server.read.read-index.replied-index.batch-interval` |
|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------|
| **Description** | if `Read.ReadIndex.Type` is `REPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced |
| **Type** | TimeDuration |
| **Default** | 10ms |

| **Property** | `raft.server.read.leader.heartbeat-check.enabled` |
|:----------------|:--------------------------------------------------|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,34 @@ static void setWriteIndexCacheExpiryTime(RaftProperties properties, TimeDuration
interface ReadIndex {
String PREFIX = Read.PREFIX + ".read-index";

String APPLIED_INDEX_ENABLED_KEY = PREFIX + ".applied-index.enabled";
boolean APPLIED_INDEX_ENABLED_DEFAULT = false;
static boolean appliedIndexEnabled(RaftProperties properties) {
return getBoolean(properties::getBoolean, APPLIED_INDEX_ENABLED_KEY,
APPLIED_INDEX_ENABLED_DEFAULT, getDefaultLog());
enum Type {
/** ReadIndex returns leader's commitIndex (see Raft Paper section 6.4). */
COMMIT_INDEX,

/** ReadIndex returns leader's appliedIndex to reduce the ReadIndex latency. */
APPLIED_INDEX,

/** ReadIndex returns leader's repliedIndex, the index of the last replied request. */
REPLIED_INDEX
}

String TYPE_KEY = PREFIX + ".type";
Type TYPE_DEFAULT = Type.COMMIT_INDEX;
static Type type(RaftProperties properties) {
return get(properties::getEnum, TYPE_KEY, TYPE_DEFAULT, getDefaultLog());
}
static void setType(RaftProperties properties, Type type) {
set(properties::setEnum, TYPE_KEY, type);
}

static void setAppliedIndexEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, APPLIED_INDEX_ENABLED_KEY, enabled);
String REPLIED_INDEX_BATCH_INTERVAL_KEY = PREFIX + ".replied-index.batch-interval";
TimeDuration REPLIED_INDEX_BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
static TimeDuration repliedIndexBatchInterval(RaftProperties properties) {
return getTimeDuration(properties.getTimeDuration(REPLIED_INDEX_BATCH_INTERVAL_DEFAULT.getUnit()),
REPLIED_INDEX_BATCH_INTERVAL_KEY, REPLIED_INDEX_BATCH_INTERVAL_DEFAULT, getDefaultLog());
}
static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) {
setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
Expand Down Expand Up @@ -82,6 +83,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -353,10 +355,13 @@ boolean isApplied() {
private final PendingStepDown pendingStepDown;

private final ReadIndexHeartbeats readIndexHeartbeats;
private final boolean readIndexAppliedIndexEnabled;
private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType;
private final Supplier<Long> readIndexSupplier;
private final boolean leaderHeartbeatCheckEnabled;
private final LeaderLease lease;

private ReplyFlusher replyFlusher;

LeaderStateImpl(RaftServerImpl server) {
this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass());
this.server = server;
Expand Down Expand Up @@ -391,8 +396,21 @@ boolean isApplied() {
} else {
this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests);
}
this.readIndexAppliedIndexEnabled = RaftServerConfigKeys.Read.ReadIndex
.appliedIndexEnabled(properties);

this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties);
switch (readIndexType) {
case REPLIED_INDEX:
this.replyFlusher = new ReplyFlusher(server.getId(), state.getLastAppliedIndex(),
RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties));
readIndexSupplier = replyFlusher::getRepliedIndex;
break;
case APPLIED_INDEX:
readIndexSupplier = () -> server.getState().getLastAppliedIndex();
break;
case COMMIT_INDEX:
default:
readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex();
}
this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read
.leaderHeartbeatCheckEnabled(properties);

Expand All @@ -418,6 +436,11 @@ void start() {
// Initialize startup log entry and append it to the RaftLog
startupLogEntry.get();
processor.start();

if (replyFlusher != null) {
replyFlusher.start(startupLogEntry.get().startIndex);
}

senders.forEach(LogAppender::start);
}

Expand Down Expand Up @@ -453,6 +476,9 @@ CompletableFuture<Void> stop() {
startupLogEntry.get().getAppliedIndexFuture().completeExceptionally(
new ReadIndexException("failed to obtain read index since: ", nle));
server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
if (replyFlusher != null) {
replyFlusher.stop();
}
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
pendingRequests.close();
Expand Down Expand Up @@ -620,7 +646,7 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo
final boolean initializing = !isCaughtUp(follower);
final RaftPeerId targetId = follower.getId();
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, getCurrentTerm(), entries,
ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
ServerImplUtils.effectiveCommitIndex(readIndexSupplier.get(), previous, entries.size()),
initializing, previous, server.getCommitInfos(), callId);
}

Expand Down Expand Up @@ -1140,23 +1166,21 @@ public boolean checkLeadership() {
/**
* Obtain the current readIndex for read only requests. See Raft paper section 6.4.
* 1. Leader makes sure at least one log from current term is committed.
* 2. Leader record last committed index or applied index (depending on configuration) as readIndex.
* 2. Leader record last committed index or applied index or replied index (depending on configuration) as readIndex.
* 3. Leader broadcast heartbeats to followers and waits for acknowledgements.
* 4. If majority respond success, returns readIndex.
* @return current readIndex.
*/
CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) {
final long index = readIndexAppliedIndexEnabled ?
server.getState().getLastAppliedIndex() : server.getRaftLog().getLastCommittedIndex();
final long index = readIndexSupplier.get();
final long readIndex;
if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex > index) {
readIndex = readAfterWriteConsistentIndex;
} else {
readIndex = index;
}
LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})",
readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit",
index, readAfterWriteConsistentIndex);
LOG.debug("readIndex={} ({}={}, readAfterWriteConsistentIndex={})",
readIndex, readIndexType, index, readAfterWriteConsistentIndex);

// if group contains only one member, fast path
if (server.getRaftConf().isSingleton()) {
Expand Down Expand Up @@ -1217,8 +1241,23 @@ private boolean checkLeaderLease() {
&& (server.getRaftConf().isSingleton() || lease.isValid());
}

void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
pendingRequests.replyPendingRequest(termIndex, reply);
void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, RetryCacheImpl.CacheEntry cacheEntry) {
final PendingRequest pending = pendingRequests.remove(termIndex);
if (pending == null) {
return;
}

final LongSupplier replyMethod = () -> {
cacheEntry.updateResult(reply);
pending.setReply(reply);
return termIndex.getIndex();
};

if (readIndexType == Type.REPLIED_INDEX) {
replyFlusher.hold(replyMethod);
} else {
replyMethod.getAsLong();
}
}

TransactionContext getTransactionContext(TermIndex termIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,13 @@ TransactionContext getTransactionContext(TermIndex termIndex) {
return pendingRequest != null ? pendingRequest.getEntry() : null;
}

void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
/** @return the removed the {@link PendingRequest} for the given {@link TermIndex}. */
PendingRequest remove(TermIndex termIndex) {
final PendingRequest pending = pendingRequests.remove(termIndex);
if (pending != null) {
Preconditions.assertEquals(termIndex, pending.getTermIndex(), "termIndex");
pending.setReply(reply);
}
return pending;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1837,8 +1837,12 @@ private CompletableFuture<Message> replyPendingRequest(
}

// update pending request
role.getLeaderState().ifPresent(leader -> leader.replyPendingRequest(termIndex, r));
cacheEntry.updateResult(r);
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
if (leader != null) {
leader.replyPendingRequest(termIndex, r, cacheEntry);
} else {
cacheEntry.updateResult(r);
}
});
}

Expand Down
Loading
Loading