Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,16 @@ private void apply(Action action) throws IOException {
}

protected void append(Record r) throws IOException {
apply(writer -> writer.append(r.tableName, r.commitId, r.mutation));
final boolean[] blockSynced = { false };
apply(writer -> {
blockSynced[0] = writer.append(r.tableName, r.commitId, r.mutation);
});
// Add to current batch only after we succeed at appending
currentBatch.add(r);
if (blockSynced[0]) {
// The block-full sync included this record — all records up to here are durable
currentBatch.clear();
}
}

protected void append(String tableName, long commitId, Mutation mutation) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,10 @@ interface Writer extends Closeable {
* @param tableName The HBase table name
* @param commitId The commit identifier
* @param mutation The mutation to append.
* @return true if an implicit sync happened (block full), false if buffered only
* @throws IOException if an I/O error occurs during append.
*/
void append(String tableName, long commitId, Mutation mutation) throws IOException;
boolean append(String tableName, long commitId, Mutation mutation) throws IOException;

/**
* Flushes any buffered data to the underlying storage and ensures it is durable (e.g., by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public long getBlocksStartOffset() {
return blocksStartOffset;
}

public void append(LogFile.Record record) throws IOException {
public boolean append(LogFile.Record record) throws IOException {
if (blockDataStream == null) {
startBlock(); // Start the block if needed
}
Expand All @@ -86,7 +86,9 @@ public void append(LogFile.Record record) throws IOException {
// To close the block, we do a sync(), which not only closes the block and opens a
// new one, it syncs the finalized block.
sync();
return true;
}
return false;
}

// Should be called before writing the first record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ public boolean isClosed() {
}

@Override
public void append(String tableName, long commitId, Mutation mutation) throws IOException {
public boolean append(String tableName, long commitId, Mutation mutation) throws IOException {
if (isClosed()) {
throw new IOException("Writer has been closed");
}
writer.append(
return writer.append(
new LogFileRecord().setHBaseTableName(tableName).setCommitId(commitId).setMutation(mutation));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.phoenix.replication.log.LogFileReaderContext;
import org.apache.phoenix.replication.log.LogFileTestUtil;
import org.apache.phoenix.replication.log.LogFileWriter;
import org.apache.phoenix.replication.log.LogFileWriterContext;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
Expand Down Expand Up @@ -1873,4 +1874,49 @@ public void testInitDegradesToSafWhenPeerInitTimesOut() throws Exception {
group.close();
}
}

/**
* Tests that rotation after block-full syncs replays only the records from the last partial
* block, not the entire inter-sync window.
*/
@Test
public void testBlockFullSyncOnAppendReducesReplayOnRotation() throws Exception {
final String tableName = "TBLBFR";
// Use a very small block size to trigger block-full sync quickly
conf.setLong(LogFileWriterContext.LOGFILE_BLOCK_SIZE, 200L);
// Use a short round duration so we can trigger rotation via time
conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 1);
recreateLogGroup();

ReplicationLog activeLog = logGroup.getActiveLog();
LogFileWriter oldWriter = activeLog.getWriter();

// Append 20 records without an explicit sync — block-full syncs will fire along the way,
// clearing currentBatch at each block boundary
long id;
for (id = 1; id <= 20; id++) {
Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2);
logGroup.append(tableName, id, put);
}

// Wait for the rotation tick to stage a new pendingWriter
waitForRotationTick(1);

// Append one more record and sync — apply() drains pendingWriter triggering replay,
// sync() ensures the disruptor has fully processed replay + new append
id++;
logGroup.append(tableName, id, LogFileTestUtil.newPut("row_21", 21, 2));
logGroup.sync();

LogFileWriter newWriter = activeLog.getWriter();
assertNotEquals("Writer should have been rotated", oldWriter, newWriter);

// Without block-full clearing, the new writer would receive a replay of all 20 records
// plus the new append (21 appends total). With block-full clearing, it receives only the
// partial-block tail plus the new append — strictly fewer than 21.
int newWriterAppendCount = Mockito.mockingDetails(newWriter).getInvocations().stream()
.filter(inv -> inv.getMethod().getName().equals("append")).mapToInt(inv -> 1).sum();
assertTrue("Replay should be bounded by last partial block, not all records. Got: "
+ newWriterAppendCount, newWriterAppendCount < id);
}
}