Skip to content

Commit fc39c38

Browse files
authored
RATIS-2173. Fix zero-copy bugs for non-gRPC cases. (#1167)
1 parent e96ed1a commit fc39c38

6 files changed

Lines changed: 66 additions & 102 deletions

File tree

ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,20 @@
2222
import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
2323
import org.apache.ratis.examples.arithmetic.TestArithmetic;
2424
import org.apache.ratis.protocol.RaftGroup;
25-
import org.apache.ratis.server.RaftServer;
2625
import org.apache.ratis.server.impl.GroupManagementBaseTest;
2726
import org.apache.ratis.server.impl.MiniRaftCluster;
28-
import org.apache.ratis.util.Slf4jUtils;
2927
import org.apache.ratis.util.function.CheckedBiConsumer;
28+
import org.junit.jupiter.api.Timeout;
3029
import org.junit.jupiter.params.ParameterizedTest;
3130
import org.junit.jupiter.params.provider.MethodSource;
32-
import org.slf4j.event.Level;
3331

3432
import java.io.IOException;
3533
import java.util.Collection;
3634
import java.util.concurrent.atomic.AtomicInteger;
3735

36+
@Timeout(value = 300)
3837
public class TestMultiRaftGroup extends BaseTest {
39-
static {
40-
Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
41-
}
42-
43-
public static Collection<Object[]> data() throws IOException {
38+
public static Collection<Object[]> data() {
4439
return ParameterizedBaseTest.getMiniRaftClusters(ArithmeticStateMachine.class, 0);
4540
}
4641

ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,23 +59,15 @@ public Comparator<Long> getCallIdComparator() {
5959
/** Send an appendEntries RPC; retry indefinitely. */
6060
private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestFirstIndex)
6161
throws InterruptedException, InterruptedIOException, RaftLogIOException {
62-
int retry = 0;
63-
64-
ReferenceCountedObject<AppendEntriesRequestProto> request = nextAppendEntriesRequest(
65-
CallId.getAndIncrement(), false);
66-
while (isRunning()) { // keep retrying for IOException
62+
for(int retry = 0; isRunning(); retry++) {
63+
final ReferenceCountedObject<AppendEntriesRequestProto> request = nextAppendEntriesRequest(
64+
CallId.getAndIncrement(), false);
65+
if (request == null) {
66+
LOG.trace("{} no entries to send now, wait ...", this);
67+
return null;
68+
}
6769
try {
68-
if (request == null || request.get().getEntriesCount() == 0) {
69-
if (request != null) {
70-
request.release();
71-
}
72-
request = nextAppendEntriesRequest(CallId.getAndIncrement(), false);
73-
}
74-
75-
if (request == null) {
76-
LOG.trace("{} no entries to send now, wait ...", this);
77-
return null;
78-
} else if (!isRunning()) {
70+
if (!isRunning()) {
7971
LOG.info("{} is stopped. Skip appendEntries.", this);
8072
return null;
8173
}
@@ -84,17 +76,19 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestF
8476
final AppendEntriesReplyProto reply = sendAppendEntries(proto);
8577
final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX;
8678
requestFirstIndex.set(first);
87-
request.release();
8879
return reply;
8980
} catch (InterruptedIOException | RaftLogIOException e) {
9081
throw e;
9182
} catch (IOException ioe) {
9283
// TODO should have more detailed retry policy here.
93-
if (retry++ % 10 == 0) { // to reduce the number of messages
84+
if (retry % 10 == 0) { // to reduce the number of messages
9485
LOG.warn("{}: Failed to appendEntries (retry={})", this, retry, ioe);
9586
}
9687
handleException(ioe);
88+
} finally {
89+
request.release();
9790
}
91+
9892
if (isRunning()) {
9993
getServer().properties().rpcSleepTime().sleep();
10094
}

ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,10 @@
4545
*/
4646
public class MemoryRaftLog extends RaftLogBase {
4747
static class EntryList {
48-
private final List<ReferenceCountedObject<LogEntryProto>> entries = new ArrayList<>();
49-
50-
ReferenceCountedObject<LogEntryProto> getRef(int i) {
51-
return i >= 0 && i < entries.size() ? entries.get(i) : null;
52-
}
48+
private final List<LogEntryProto> entries = new ArrayList<>();
5349

5450
LogEntryProto get(int i) {
55-
final ReferenceCountedObject<LogEntryProto> ref = getRef(i);
56-
return ref != null ? ref.get() : null;
51+
return i >= 0 && i < entries.size() ? entries.get(i) : null;
5752
}
5853

5954
TermIndex getTermIndex(int i) {
@@ -81,13 +76,10 @@ void purge(int index) {
8176
}
8277

8378
void clear(int from, int to) {
84-
List<ReferenceCountedObject<LogEntryProto>> subList = entries.subList(from, to);
85-
subList.forEach(ReferenceCountedObject::release);
86-
subList.clear();
79+
entries.subList(from, to).clear();
8780
}
8881

89-
void add(ReferenceCountedObject<LogEntryProto> entryRef) {
90-
entryRef.retain();
82+
void add(LogEntryProto entryRef) {
9183
entries.add(entryRef);
9284
}
9385
}
@@ -128,7 +120,8 @@ public LogEntryProto get(long index) throws RaftLogIOException {
128120
public ReferenceCountedObject<LogEntryProto> retainLog(long index) {
129121
checkLogState();
130122
try (AutoCloseableLock readLock = readLock()) {
131-
ReferenceCountedObject<LogEntryProto> ref = entries.getRef(Math.toIntExact(index));
123+
final LogEntryProto entry = entries.get(Math.toIntExact(index));
124+
final ReferenceCountedObject<LogEntryProto> ref = ReferenceCountedObject.wrap(entry);
132125
ref.retain();
133126
return ref;
134127
}
@@ -205,7 +198,7 @@ protected CompletableFuture<Long> appendEntryImpl(ReferenceCountedObject<LogEntr
205198
LogEntryProto entry = entryRef.retain();
206199
try (AutoCloseableLock writeLock = writeLock()) {
207200
validateLogEntry(entry);
208-
entries.add(entryRef);
201+
entries.add(entry);
209202
} finally {
210203
entryRef.release();
211204
}
@@ -253,7 +246,7 @@ public List<CompletableFuture<Long>> appendImpl(ReferenceCountedObject<List<LogE
253246
}
254247
for (int i = index; i < logEntryProtos.size(); i++) {
255248
LogEntryProto logEntryProto = logEntryProtos.get(i);
256-
entries.add(entriesRef.delegate(logEntryProto));
249+
entries.add(LogProtoUtils.copy(logEntryProto));
257250
futures.add(CompletableFuture.completedFuture(logEntryProto.getIndex()));
258251
}
259252
return futures;

ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,18 @@
3333
import org.apache.ratis.server.RaftServerConfigKeys;
3434
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
3535
import org.apache.ratis.server.impl.MiniRaftCluster;
36-
import org.apache.ratis.server.impl.RaftServerTestUtil;
3736
import org.apache.ratis.server.impl.RetryCacheTestUtil;
3837
import org.apache.ratis.server.metrics.ServerMetricsTestUtils;
3938
import org.apache.ratis.server.raftlog.RaftLog;
4039
import org.apache.ratis.util.ExitUtils;
4140
import org.apache.ratis.util.JavaUtils;
42-
import org.apache.ratis.util.Slf4jUtils;
4341
import org.apache.ratis.util.TimeDuration;
4442
import org.apache.ratis.util.Timestamp;
4543
import org.junit.jupiter.api.Assertions;
4644
import org.junit.jupiter.api.Assumptions;
4745
import org.junit.jupiter.api.Test;
4846
import org.junit.jupiter.api.Timeout;
4947
import org.slf4j.Logger;
50-
import org.slf4j.event.Level;
5148

5249
import java.io.IOException;
5350
import java.util.List;
@@ -75,9 +72,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
7572
extends BaseTest
7673
implements MiniRaftCluster.Factory.Get<CLUSTER> {
7774
{
78-
Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
79-
RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG);
80-
8175
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS));
8276
}
8377

ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,11 @@ static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, S
187187
log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) {
188188
++idxExpected;
189189
}
190-
} catch (IOException e) {
191-
throw new RuntimeException(e);
190+
} catch (Exception e) {
191+
throw new IllegalStateException("Failed logEntriesContains: startIndex=" + startIndex
192+
+ ", endIndex=" + endIndex
193+
+ ", #expectedMessages=" + expectedMessages.length
194+
+ ", log=" + log, e);
192195
}
193196
++idxEntries;
194197
}

0 commit comments

Comments
 (0)