diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java index d6b7b015356..f94069d0885 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java @@ -339,9 +339,16 @@ private void apply(Action action) throws IOException { } protected void append(Record r) throws IOException { - apply(writer -> writer.append(r.tableName, r.commitId, r.mutation)); + final boolean[] blockSynced = { false }; + apply(writer -> { + blockSynced[0] = writer.append(r.tableName, r.commitId, r.mutation); + }); // Add to current batch only after we succeed at appending currentBatch.add(r); + if (blockSynced[0]) { + // The block-full sync included this record — all records up to here are durable + currentBatch.clear(); + } } protected void append(String tableName, long commitId, Mutation mutation) throws IOException { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java index b1f05f4af9c..a93ada0b488 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java @@ -328,9 +328,10 @@ interface Writer extends Closeable { * @param tableName The HBase table name * @param commitId The commit identifier * @param mutation The mutation to append. + * @return true if an implicit sync happened (block full), false if buffered only * @throws IOException if an I/O error occurs during append. */ - void append(String tableName, long commitId, Mutation mutation) throws IOException; + boolean append(String tableName, long commitId, Mutation mutation) throws IOException; /** * Flushes any buffered data to the underlying storage and ensures it is durable (e.g., by diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java index 11b427112a8..ead68e0ba84 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java @@ -74,7 +74,7 @@ public long getBlocksStartOffset() { return blocksStartOffset; } - public void append(LogFile.Record record) throws IOException { + public boolean append(LogFile.Record record) throws IOException { if (blockDataStream == null) { startBlock(); // Start the block if needed } @@ -86,7 +86,9 @@ public void append(LogFile.Record record) throws IOException { // To close the block, we do a sync(), which not only closes the block and opens a // new one, it syncs the finalized block. sync(); + return true; } + return false; } // Should be called before writing the first record. diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java index 2a0f4c5dd1a..7f5ba64084a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java @@ -78,11 +78,11 @@ public boolean isClosed() { } @Override - public void append(String tableName, long commitId, Mutation mutation) throws IOException { + public boolean append(String tableName, long commitId, Mutation mutation) throws IOException { if (isClosed()) { throw new IOException("Writer has been closed"); } - writer.append( + return writer.append( new LogFileRecord().setHBaseTableName(tableName).setCommitId(commitId).setMutation(mutation)); } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 2bf723defd0..ddd0cea0ec3 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -56,6 +56,7 @@ import org.apache.phoenix.replication.log.LogFileReaderContext; import org.apache.phoenix.replication.log.LogFileTestUtil; import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; @@ -1873,4 +1874,49 @@ public void testInitDegradesToSafWhenPeerInitTimesOut() throws Exception { group.close(); } } + + /** + * Tests that rotation after block-full syncs replays only the records from the last partial + * block, not the entire inter-sync window. + */ + @Test + public void testBlockFullSyncOnAppendReducesReplayOnRotation() throws Exception { + final String tableName = "TBLBFR"; + // Use a very small block size to trigger block-full sync quickly + conf.setLong(LogFileWriterContext.LOGFILE_BLOCK_SIZE, 200L); + // Use a short round duration so we can trigger rotation via time + conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 1); + recreateLogGroup(); + + ReplicationLog activeLog = logGroup.getActiveLog(); + LogFileWriter oldWriter = activeLog.getWriter(); + + // Append 20 records without an explicit sync — block-full syncs will fire along the way, + // clearing currentBatch at each block boundary + long id; + for (id = 1; id <= 20; id++) { + Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2); + logGroup.append(tableName, id, put); + } + + // Wait for the rotation tick to stage a new pendingWriter + waitForRotationTick(1); + + // Append one more record and sync — apply() drains pendingWriter triggering replay, + // sync() ensures the disruptor has fully processed replay + new append + id++; + logGroup.append(tableName, id, LogFileTestUtil.newPut("row_21", 21, 2)); + logGroup.sync(); + + LogFileWriter newWriter = activeLog.getWriter(); + assertNotEquals("Writer should have been rotated", oldWriter, newWriter); + + // Without block-full clearing, the new writer would receive a replay of all 20 records + // plus the new append (21 appends total). With block-full clearing, it receives only the + // partial-block tail plus the new append — strictly fewer than 21. + int newWriterAppendCount = Mockito.mockingDetails(newWriter).getInvocations().stream() + .filter(inv -> inv.getMethod().getName().equals("append")).mapToInt(inv -> 1).sum(); + assertTrue("Replay should be bounded by last partial block, not all records. Got: " + + newWriterAppendCount, newWriterAppendCount < id); + } }