Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public ZonePartitionRaftListener(
.addHandler(
PartitionReplicationMessageGroup.GROUP_TYPE,
Commands.WRITE_INTENT_SWITCH_V2,
new WriteIntentSwitchCommandHandler(tableProcessors::get, txManager))
new WriteIntentSwitchCommandHandler(tableProcessors::get, txManager, txStatePartitionStorage))
.addHandler(
TxMessageGroup.GROUP_TYPE,
VACUUM_TX_STATE_COMMAND,
Expand Down Expand Up @@ -206,7 +206,19 @@ private void processWriteCommand(CommandClosure<WriteCommand> clo) {
safeTimestamp
);
} else if (command instanceof UpdateMinimumActiveTxBeginTimeCommand) {
result = processCrossTableProcessorsCommand(command, commandIndex, commandTerm, safeTimestamp);
CrossTableCommandResult crossTableResult = processCrossTableProcessorsCommand(
command,
commandIndex,
commandTerm,
safeTimestamp
);
result = crossTableResult.result;

if (!crossTableResult.hadAnyTableProcessor) {
// We MUST bump information about last updated index+term at least in one storage.
// See a comment in #onWrite() for explanation.
updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);
}
} else if (command instanceof SafeTimeSyncCommand) {
result = handleSafeTimeSyncCommand(commandIndex, commandTerm);
} else if (command instanceof PrimaryReplicaChangeCommand) {
Expand All @@ -216,7 +228,13 @@ private void processWriteCommand(CommandClosure<WriteCommand> clo) {
partitionKey.toReplicationGroupId(), commandIndex, commandTerm,
cmd.leaseStartTime(), cmd.primaryReplicaNodeId(), cmd.primaryReplicaNodeName());

result = processCrossTableProcessorsCommand(command, commandIndex, commandTerm, safeTimestamp);
CrossTableCommandResult crossTableResult = processCrossTableProcessorsCommand(
command,
commandIndex,
commandTerm,
safeTimestamp
);
result = crossTableResult.result;

if (updateLeaseInfoInTxStorage(cmd, commandIndex, commandTerm)) {
LOG.debug("Updated lease info in tx storage [groupId={}, commandIndex={}, leaseStartTime={}]",
Expand All @@ -231,6 +249,8 @@ private void processWriteCommand(CommandClosure<WriteCommand> clo) {
if (commandHandler == null) {
LOG.info("Message type {} is not supported by the zone partition RAFT listener yet", command.getClass());

updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);

result = EMPTY_APPLIED_RESULT;
} else {
result = commandHandler.handle(command, commandIndex, commandTerm, safeTimestamp);
Expand Down Expand Up @@ -275,14 +295,14 @@ private void processWriteCommand(CommandClosure<WriteCommand> clo) {
* @param safeTimestamp Safe timestamp.
* @return Tuple with the result of the command processing and a flag indicating whether the command was applied.
*/
private CommandResult processCrossTableProcessorsCommand(
private CrossTableCommandResult processCrossTableProcessorsCommand(
WriteCommand command,
long commandIndex,
long commandTerm,
@Nullable HybridTimestamp safeTimestamp
) {
if (tableProcessors.isEmpty()) {
return new CommandResult(null, commandIndex > lastAppliedIndex);
return new CrossTableCommandResult(false, new CommandResult(null, commandIndex > lastAppliedIndex));
}

boolean wasApplied = false;
Expand All @@ -293,7 +313,7 @@ private CommandResult processCrossTableProcessorsCommand(
wasApplied = wasApplied || r.wasApplied();
}

return new CommandResult(null, wasApplied);
return new CrossTableCommandResult(true, new CommandResult(null, wasApplied));
}

/**
Expand Down Expand Up @@ -322,12 +342,22 @@ private CommandResult processTableAwareCommand(
LOG.warn("Table processor for table ID {} not found. Command will be ignored: {}",
tableId, command.toStringForLightLogging());

// We MUST bump information about last updated index+term.
// See a comment in #onWrite() for explanation.
updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);

return EMPTY_APPLIED_RESULT;
}

return tableProcessor.processCommand(command, commandIndex, commandTerm, safeTimestamp);
}

private void updateTxStateStorageLastAppliedIfNotStale(long commandIndex, long commandTerm) {
if (commandIndex > txStateStorage.lastAppliedIndex()) {
txStateStorage.lastApplied(commandIndex, commandTerm);
}
}

private boolean updateLeaseInfoInTxStorage(PrimaryReplicaChangeCommand command, long commandIndex, long commandTerm) {
if (commandIndex <= txStateStorage.lastAppliedIndex()) {
return false;
Expand Down Expand Up @@ -518,7 +548,6 @@ private PartitionSnapshots partitionSnapshots() {
/**
* Handler for the {@link SafeTimeSyncCommand}.
*
* @param cmd Command.
* @param commandIndex RAFT index of the command.
* @param commandTerm RAFT term of the command.
*/
Expand All @@ -530,7 +559,7 @@ private CommandResult handleSafeTimeSyncCommand(long commandIndex, long commandT

// We MUST bump information about last updated index+term.
// See a comment in #onWrite() for explanation.
txStateStorage.lastApplied(commandIndex, commandTerm);
updateTxStateStorageLastAppliedIfNotStale(commandIndex, commandTerm);

return EMPTY_APPLIED_RESULT;
}
Expand All @@ -539,4 +568,14 @@ private CommandResult handleSafeTimeSyncCommand(long commandIndex, long commandT
public HybridTimestamp currentSafeTime() {
return safeTimeTracker.current();
}

private static class CrossTableCommandResult {
private final boolean hadAnyTableProcessor;
private final CommandResult result;

private CrossTableCommandResult(boolean hadAnyTableProcessor, CommandResult result) {
this.hadAnyTableProcessor = hadAnyTableProcessor;
this.result = result;
}
Comment thread
rpuch marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -39,9 +40,16 @@ public class WriteIntentSwitchCommandHandler extends AbstractCommandHandler<Writ

private final RaftTxFinishMarker txFinishMarker;

private final TxStatePartitionStorage txStatePartitionStorage;

/** Constructor. */
public WriteIntentSwitchCommandHandler(IntFunction<RaftTableProcessor> tableProcessorByTableId, TxManager txManager) {
public WriteIntentSwitchCommandHandler(
IntFunction<RaftTableProcessor> tableProcessorByTableId,
TxManager txManager,
TxStatePartitionStorage txStatePartitionStorage
) {
this.tableProcessorByTableId = tableProcessorByTableId;
this.txStatePartitionStorage = txStatePartitionStorage;

txFinishMarker = new RaftTxFinishMarker(txManager);
}
Expand All @@ -58,6 +66,7 @@ protected CommandResult handleInternally(
txFinishMarker.markFinished(switchCommand.txId(), switchCommand.commit(), switchCommand.commitTimestamp(), null);

boolean applied = false;
boolean handledByAnyTable = false;
for (int tableId : ((WriteIntentSwitchCommandV2) switchCommand).tableIds()) {
RaftTableProcessor tableProcessor = raftTableProcessor(tableId);

Expand All @@ -76,6 +85,13 @@ protected CommandResult handleInternally(
.processCommand(switchCommand, commandIndex, commandTerm, safeTimestamp);

applied = applied || singleResult.wasApplied();
handledByAnyTable = true;
}

// We MUST bump information about last updated index+term at least in one storage.
// See a comment in ZonePartitionRaftListener#onWrite() for explanation.
if (!handledByAnyTable && commandIndex > txStatePartitionStorage.lastAppliedIndex()) {
txStatePartitionStorage.lastApplied(commandIndex, commandTerm);
}

return new CommandResult(null, applied);
Expand Down
Loading