Skip to content

Commit e46973b

Browse files
committed
[kv] PrimaryKey table support standby replica to reduce recovery time
1 parent b432b14 commit e46973b

53 files changed

Lines changed: 1715 additions & 733 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class MetricNames {
7373
public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
7474
public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = "localSize";
7575
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = "remoteLogSize";
76+
public static final String SERVER_PHYSICAL_STORAGE_STANDBY_SIZE = "standbySize";
7677

7778
// --------------------------------------------------------------------------------------------
7879
// metrics for table

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ message NotifyKvSnapshotOffsetRequest {
332332
required int32 bucket_id = 3;
333333
required int32 coordinator_epoch = 4;
334334
required int64 min_retain_offset = 5;
335+
optional int64 snapshot_id = 6;
335336
}
336337

337338
message NotifyKvSnapshotOffsetResponse {
@@ -745,6 +746,7 @@ message PbAdjustIsrReqForBucket {
745746
repeated int32 new_isr = 4 [packed = true];
746747
required int32 coordinator_epoch = 5;
747748
required int32 bucket_epoch = 6;
749+
repeated int32 standby_replicas = 7 [packed = true];
748750
}
749751

750752
message PbAdjustIsrRespForTable {
@@ -762,6 +764,7 @@ message PbAdjustIsrRespForBucket {
762764
repeated int32 isr = 7 [packed = true];
763765
optional int32 bucket_epoch = 8;
764766
optional int32 coordinator_epoch = 9;
767+
repeated int32 standby_replicas = 10 [packed = true];
765768
}
766769

767770
message PbListOffsetsRespForBucket {
@@ -782,6 +785,7 @@ message PbNotifyLeaderAndIsrReqForBucket {
782785
repeated int32 replicas = 5 [packed = true];
783786
repeated int32 isr = 6 [packed = true];
784787
required int32 bucket_epoch = 7;
788+
repeated int32 standby_replicas = 8 [packed = true];
785789
}
786790

787791
message PbNotifyLeaderAndIsrRespForBucket {

fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws T
173173
* @param tableBucket the table bucket
174174
* @return the tablet directory
175175
*/
176-
protected File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
176+
public File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) {
177177
File tabletDir = getTabletDir(tablePath, tableBucket);
178178
if (tabletDir.exists()) {
179179
return tabletDir;

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
959959
// TODO: reject the request if there is a replica in ISR is not online,
960960
// see KIP-841.
961961
tryAdjustLeaderAndIsr.isr(),
962+
tryAdjustLeaderAndIsr.standbyReplicas(),
962963
coordinatorContext.getCoordinatorEpoch(),
963964
currentLeaderAndIsr.bucketEpoch() + 1);
964965
newLeaderAndIsrList.put(tableBucket, newLeaderAndIsr);
@@ -1071,7 +1072,9 @@ private void tryProcessCommitKvSnapshot(
10711072
completedSnapshotStore.add(completedSnapshot);
10721073
coordinatorEventManager.put(
10731074
new NotifyKvSnapshotOffsetEvent(
1074-
tb, completedSnapshot.getLogOffset()));
1075+
tb,
1076+
completedSnapshot.getLogOffset(),
1077+
completedSnapshot.getSnapshotID()));
10751078
callback.complete(new CommitKvSnapshotResponse());
10761079
} catch (Exception e) {
10771080
callback.completeExceptionally(e);
@@ -1081,7 +1084,6 @@ private void tryProcessCommitKvSnapshot(
10811084

10821085
private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent event) {
10831086
TableBucket tb = event.getTableBucket();
1084-
long logOffset = event.getLogOffset();
10851087
coordinatorRequestBatch.newBatch();
10861088
coordinatorContext
10871089
.getBucketLeaderAndIsr(tb)
@@ -1092,7 +1094,8 @@ private void processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
10921094
coordinatorContext.getFollowers(
10931095
tb, leaderAndIsr.leader()),
10941096
tb,
1095-
logOffset));
1097+
event.getLogOffset(),
1098+
event.getSnapshotId()));
10961099
coordinatorRequestBatch.sendNotifyKvSnapshotOffsetRequest(
10971100
coordinatorContext.getCoordinatorEpoch());
10981101
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,15 +361,18 @@ public void addNotifyRemoteLogOffsetsRequestForTabletServers(
361361
}
362362

363363
public void addNotifyKvSnapshotOffsetRequestForTabletServers(
364-
List<Integer> tabletServers, TableBucket tableBucket, long minRetainOffset) {
364+
List<Integer> tabletServers,
365+
TableBucket tableBucket,
366+
long minRetainOffset,
367+
long snapshotId) {
365368
tabletServers.stream()
366369
.filter(s -> s >= 0)
367370
.forEach(
368371
id ->
369372
notifyKvSnapshotOffsetRequestMap.put(
370373
id,
371374
makeNotifyKvSnapshotOffsetRequest(
372-
tableBucket, minRetainOffset)));
375+
tableBucket, minRetainOffset, snapshotId)));
373376
}
374377

375378
public void addNotifyLakeTableOffsetRequestForTableServers(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ public class NotifyKvSnapshotOffsetEvent implements CoordinatorEvent {
2525

2626
private final TableBucket tableBucket;
2727
private final long logOffset;
28+
private final long snapshotId;
2829

29-
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset) {
30+
public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long logOffset, long snapshotId) {
3031
this.tableBucket = tableBucket;
3132
this.logOffset = logOffset;
33+
this.snapshotId = snapshotId;
3234
}
3335

3436
public TableBucket getTableBucket() {
@@ -38,4 +40,8 @@ public TableBucket getTableBucket() {
3840
public long getLogOffset() {
3941
return logOffset;
4042
}
43+
44+
public long getSnapshotId() {
45+
return snapshotId;
46+
}
4147
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java

Lines changed: 163 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.server.zk.data.LeaderAndIsr;
2222

2323
import java.util.ArrayList;
24+
import java.util.Collections;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Optional;
@@ -36,23 +37,39 @@ public class ReplicaLeaderElectionAlgorithms {
3637
* @param assignments the assignments
3738
* @param aliveReplicas the alive replicas
3839
* @param coordinatorEpoch the coordinator epoch
40+
* @param isPrimaryKeyTable whether this table bucket is primary key table
3941
* @return the election result
4042
*/
4143
public static Optional<ElectionResult> initReplicaLeaderElection(
42-
List<Integer> assignments, List<Integer> aliveReplicas, int coordinatorEpoch) {
43-
// currently, we always use the first replica in assignment, which also in aliveReplicas and
44-
// isr as the leader replica.
45-
for (int assignment : assignments) {
46-
if (aliveReplicas.contains(assignment)) {
47-
return Optional.of(
48-
new ElectionResult(
49-
aliveReplicas,
50-
new LeaderAndIsr(
51-
assignment, 0, aliveReplicas, coordinatorEpoch, 0)));
52-
}
44+
List<Integer> assignments,
45+
List<Integer> aliveReplicas,
46+
int coordinatorEpoch,
47+
boolean isPrimaryKeyTable) {
48+
// First we will filter out the assignment list to only contain the alive replicas.
49+
List<Integer> availableReplicas =
50+
assignments.stream().filter(aliveReplicas::contains).collect(Collectors.toList());
51+
52+
// If the assignment list is empty, we return empty.
53+
if (availableReplicas.isEmpty()) {
54+
return Optional.empty();
5355
}
5456

55-
return Optional.empty();
57+
// Then we will use the first replica in assignment as the leader replica.
58+
int leader = availableReplicas.get(0);
59+
60+
// If this table is primaryKey table, we will use the second replica in assignment as the
61+
// standby if exists.
62+
List<Integer> standbyReplica = new ArrayList<>();
63+
if (isPrimaryKeyTable) {
64+
if (availableReplicas.size() > 1) {
65+
standbyReplica.add(availableReplicas.get(1));
66+
}
67+
}
68+
return Optional.of(
69+
new ElectionResult(
70+
aliveReplicas,
71+
new LeaderAndIsr(
72+
leader, 0, aliveReplicas, standbyReplica, coordinatorEpoch, 0)));
5673
}
5774

5875
/**
@@ -61,22 +78,77 @@ public static Optional<ElectionResult> initReplicaLeaderElection(
6178
* @param assignments the assignments
6279
* @param aliveReplicas the alive replicas
6380
* @param leaderAndIsr the original leaderAndIsr
81+
* @param isPrimaryKeyTable whether this table bucket is primary key table
6482
* @return the election result
6583
*/
6684
public static Optional<ElectionResult> defaultReplicaLeaderElection(
67-
List<Integer> assignments, List<Integer> aliveReplicas, LeaderAndIsr leaderAndIsr) {
68-
// currently, we always use the first replica in assignment, which also in aliveReplicas and
69-
// isr as the leader replica.
85+
List<Integer> assignments,
86+
List<Integer> aliveReplicas,
87+
LeaderAndIsr leaderAndIsr,
88+
boolean isPrimaryKeyTable) {
7089
List<Integer> isr = leaderAndIsr.isr();
71-
for (int assignment : assignments) {
72-
if (aliveReplicas.contains(assignment) && isr.contains(assignment)) {
73-
return Optional.of(
74-
new ElectionResult(
75-
aliveReplicas, leaderAndIsr.newLeaderAndIsr(assignment, isr)));
90+
// First we will filter out the assignment list to only contain the alive replicas and isr.
91+
List<Integer> availableReplicas =
92+
assignments.stream()
93+
.filter(replica -> aliveReplicas.contains(replica) && isr.contains(replica))
94+
.collect(Collectors.toList());
95+
96+
// If the assignment list is empty, we return empty.
97+
if (availableReplicas.isEmpty()) {
98+
return Optional.empty();
99+
}
100+
101+
// For log table, we will use the first replica in assignment as the leader replica.
102+
if (!isPrimaryKeyTable) {
103+
int leader = availableReplicas.get(0);
104+
return Optional.of(
105+
new ElectionResult(
106+
aliveReplicas,
107+
leaderAndIsr.newLeaderAndIsr(leader, isr, Collections.emptyList())));
108+
}
109+
110+
int currentStandby =
111+
leaderAndIsr.standbyReplicas().isEmpty()
112+
? -1
113+
: leaderAndIsr.standbyReplicas().get(0);
114+
int newLeader;
115+
int newStandby = -1;
116+
if (currentStandby != -1 && availableReplicas.contains(currentStandby)) {
117+
// Standby exists and is available: promote it to leader.
118+
newLeader = currentStandby;
119+
// Find a new standby for the candidate replica (not leader).
120+
List<Integer> candidatesForStandby =
121+
availableReplicas.stream()
122+
.filter(replica -> replica != newLeader)
123+
.collect(Collectors.toList());
124+
if (!candidatesForStandby.isEmpty()) {
125+
newStandby = candidatesForStandby.get(0);
126+
}
127+
} else {
128+
// Standby does not exist or is not available: promote the first replica in assignment
129+
// to leader.
130+
newLeader = availableReplicas.get(0);
131+
// Find a new standby for the candidate replica (not leader).
132+
List<Integer> candidatesForStandby =
133+
availableReplicas.stream()
134+
.filter(replica -> replica != newLeader)
135+
.collect(Collectors.toList());
136+
if (!candidatesForStandby.isEmpty()) {
137+
newStandby = candidatesForStandby.get(0);
76138
}
77139
}
78140

79-
return Optional.empty();
141+
if (newStandby == -1) {
142+
newStandby =
143+
availableReplicas.stream()
144+
.filter(replica -> replica != newLeader)
145+
.collect(Collectors.toList())
146+
.get(0);
147+
}
148+
149+
LeaderAndIsr newLeaderAndIsr =
150+
leaderAndIsr.newLeaderAndIsr(newLeader, isr, Collections.singletonList(newStandby));
151+
return Optional.of(new ElectionResult(aliveReplicas, newLeaderAndIsr));
80152
}
81153

82154
/**
@@ -92,25 +164,79 @@ public static Optional<ElectionResult> controlledShutdownReplicaLeaderElection(
92164
List<Integer> assignments,
93165
List<Integer> aliveReplicas,
94166
LeaderAndIsr leaderAndIsr,
95-
Set<Integer> shutdownTabletServers) {
167+
Set<Integer> shutdownTabletServers,
168+
boolean isPrimaryKeyTable) {
96169
List<Integer> originIsr = leaderAndIsr.isr();
97170
Set<Integer> isrSet = new HashSet<>(originIsr);
98-
for (Integer id : assignments) {
99-
if (aliveReplicas.contains(id)
100-
&& isrSet.contains(id)
101-
&& !shutdownTabletServers.contains(id)) {
102-
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
103-
newAliveReplicas.removeAll(shutdownTabletServers);
104-
List<Integer> newIsr =
105-
originIsr.stream()
106-
.filter(replica -> !shutdownTabletServers.contains(replica))
107-
.collect(Collectors.toList());
108-
return Optional.of(
109-
new ElectionResult(
110-
new ArrayList<>(newAliveReplicas),
111-
leaderAndIsr.newLeaderAndIsr(id, newIsr)));
171+
// First we will filter out the assignment list to only contain the alive replicas, isr, and
172+
// not is shutdownTabletServers set.
173+
List<Integer> availableReplicas =
174+
assignments.stream()
175+
.filter(
176+
replica ->
177+
aliveReplicas.contains(replica)
178+
&& isrSet.contains(replica)
179+
&& !shutdownTabletServers.contains(replica))
180+
.collect(Collectors.toList());
181+
// If the assignment list is empty, we return empty.
182+
if (availableReplicas.isEmpty()) {
183+
return Optional.empty();
184+
}
185+
186+
int currentStandby =
187+
leaderAndIsr.standbyReplicas().isEmpty()
188+
? -1
189+
: leaderAndIsr.standbyReplicas().get(0);
190+
int newLeader;
191+
int newStandby = -1;
192+
if (!isPrimaryKeyTable) {
193+
// For log table, we will use the first replica in availableReplicas as the leader
194+
// replica.
195+
newLeader = availableReplicas.get(0);
196+
} else if (currentStandby != -1 && availableReplicas.contains(currentStandby)) {
197+
// For pk table, Standby exists and is available: promote it to leader.
198+
newLeader = currentStandby;
199+
// Find a new standby for the candidate replica (not leader).
200+
List<Integer> candidatesForStandby =
201+
availableReplicas.stream()
202+
.filter(replica -> replica != newLeader)
203+
.collect(Collectors.toList());
204+
if (!candidatesForStandby.isEmpty()) {
205+
newStandby = candidatesForStandby.get(0);
206+
}
207+
} else {
208+
// Standby does not exist or is not available: promote the first replica in assignment
209+
// to leader.
210+
newLeader = availableReplicas.get(0);
211+
// Find a new standby for the candidate replica (not leader).
212+
List<Integer> candidatesForStandby =
213+
availableReplicas.stream()
214+
.filter(replica -> replica != newLeader)
215+
.collect(Collectors.toList());
216+
if (!candidatesForStandby.isEmpty()) {
217+
newStandby = candidatesForStandby.get(0);
112218
}
113219
}
114-
return Optional.empty();
220+
221+
Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
222+
newAliveReplicas.removeAll(shutdownTabletServers);
223+
List<Integer> newIsr =
224+
originIsr.stream()
225+
.filter(replica -> !shutdownTabletServers.contains(replica))
226+
.collect(Collectors.toList());
227+
228+
if (newStandby == -1) {
229+
newStandby =
230+
availableReplicas.stream()
231+
.filter(replica -> replica != newLeader)
232+
.collect(Collectors.toList())
233+
.get(0);
234+
}
235+
236+
return Optional.of(
237+
new ElectionResult(
238+
new ArrayList<>(newAliveReplicas),
239+
leaderAndIsr.newLeaderAndIsr(
240+
newLeader, newIsr, Collections.singletonList(newStandby))));
115241
}
116242
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
453453
.collect(Collectors.toList());
454454
LeaderAndIsr adjustLeaderAndIsr =
455455
newLeader == LeaderAndIsr.NO_LEADER
456-
? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
456+
? leaderAndIsr.newLeaderAndIsr(
457+
newLeader, newIsr, leaderAndIsr.standbyReplicas())
457458
: leaderAndIsr.newLeaderAndIsr(newIsr);
458459
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
459460
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);

0 commit comments

Comments
 (0)