|
25 | 25 | import org.apache.fluss.client.initializer.OffsetsInitializer; |
26 | 26 | import org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever; |
27 | 27 | import org.apache.fluss.client.initializer.SnapshotOffsetsInitializer; |
28 | | -import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent; |
29 | 28 | import org.apache.fluss.client.metadata.KvSnapshots; |
30 | 29 | import org.apache.fluss.config.ConfigOptions; |
31 | 30 | import org.apache.fluss.config.Configuration; |
32 | 31 | import org.apache.fluss.flink.lake.LakeSplitGenerator; |
33 | 32 | import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; |
34 | 33 | import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; |
| 34 | +import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent; |
35 | 35 | import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent; |
36 | 36 | import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; |
37 | 37 | import org.apache.fluss.flink.source.reader.LeaseContext; |
|
62 | 62 | import org.slf4j.LoggerFactory; |
63 | 63 |
|
64 | 64 | import javax.annotation.Nullable; |
65 | | -import javax.annotation.concurrent.GuardedBy; |
66 | 65 |
|
67 | 66 | import java.io.IOException; |
68 | 67 | import java.util.ArrayList; |
|
79 | 78 | import java.util.OptionalLong; |
80 | 79 | import java.util.Set; |
81 | 80 | import java.util.TreeMap; |
82 | | -import java.util.concurrent.locks.ReadWriteLock; |
83 | | -import java.util.concurrent.locks.ReentrantReadWriteLock; |
84 | 81 | import java.util.stream.Collectors; |
85 | 82 |
|
86 | 83 | import static org.apache.fluss.utils.Preconditions.checkNotNull; |
87 | 84 | import static org.apache.fluss.utils.Preconditions.checkState; |
88 | | -import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; |
89 | 85 |
|
90 | 86 | /** |
91 | 87 | * An implementation of {@link SplitEnumerator} for the data of Fluss. |
@@ -142,9 +138,7 @@ public class FlinkSourceEnumerator |
142 | 138 |
|
143 | 139 | private final LeaseContext leaseContext; |
144 | 140 |
|
145 | | - private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
146 | 141 | /** checkpointId -> tableBuckets who finished consume kv snapshots. */ |
147 | | - @GuardedBy("lock") |
148 | 142 | private final TreeMap<Long, Set<TableBucket>> consumedKvSnapshotMap = new TreeMap<>(); |
149 | 143 |
|
150 | 144 | // Lazily instantiated or mutable fields. |
@@ -556,66 +550,58 @@ private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String partitionNa |
556 | 550 | Map<Integer, Long> snapshotIds = new HashMap<>(); |
557 | 551 | Map<Integer, Long> logOffsets = new HashMap<>(); |
558 | 552 |
|
559 | | - // retry to get the latest kv snapshots and acquire kvSnapshot lease util all buckets |
560 | | - // acquire success. The reason is that getLatestKvSnapshots and acquireKvSnapshotLease |
561 | | - // are not atomic operations, the latest kv snapshot obtained via get may become outdated by |
562 | | - // the time it is passed to acquire. Therefore, this logic must implement a retry |
563 | | - // mechanism: the unavailable tableBuckets in the AcquiredKvSnapshotLeaseResult returned by |
564 | | - // acquireKvSnapshotLease must be retried repeatedly until all buckets are successfully |
565 | | - // acquired. |
| 553 | + // retry to get the latest kv snapshots and acquire kvSnapshot lease. |
566 | 554 | try { |
567 | | - Set<TableBucket> remainingTableBuckets; |
568 | | - do { |
569 | | - KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName); |
570 | | - remainingTableBuckets = new HashSet<>(kvSnapshots.getTableBuckets()); |
571 | | - |
572 | | - tableId = kvSnapshots.getTableId(); |
573 | | - partitionId = kvSnapshots.getPartitionId(); |
574 | | - |
575 | | - Set<TableBucket> ignoreBuckets = new HashSet<>(); |
576 | | - Map<TableBucket, Long> bucketsToLease = new HashMap<>(); |
577 | | - for (TableBucket tb : remainingTableBuckets) { |
578 | | - int bucket = tb.getBucket(); |
579 | | - OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucket); |
580 | | - OptionalLong logOffsetOpt = kvSnapshots.getLogOffset(bucket); |
581 | | - if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) { |
582 | | - bucketsToLease.put(tb, snapshotIdOpt.getAsLong()); |
583 | | - } else { |
584 | | - ignoreBuckets.add(tb); |
585 | | - } |
586 | | - |
587 | | - snapshotIds.put( |
588 | | - bucket, snapshotIdOpt.isPresent() ? snapshotIdOpt.getAsLong() : null); |
589 | | - logOffsets.put( |
590 | | - bucket, logOffsetOpt.isPresent() ? logOffsetOpt.getAsLong() : null); |
| 555 | + KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName); |
| 556 | + Set<TableBucket> remainingTableBuckets = new HashSet<>(kvSnapshots.getTableBuckets()); |
| 557 | + |
| 558 | + tableId = kvSnapshots.getTableId(); |
| 559 | + partitionId = kvSnapshots.getPartitionId(); |
| 560 | + |
| 561 | + Set<TableBucket> ignoreBuckets = new HashSet<>(); |
| 562 | + Map<TableBucket, Long> bucketsToLease = new HashMap<>(); |
| 563 | + for (TableBucket tb : remainingTableBuckets) { |
| 564 | + int bucket = tb.getBucket(); |
| 565 | + OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucket); |
| 566 | + OptionalLong logOffsetOpt = kvSnapshots.getLogOffset(bucket); |
| 567 | + if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) { |
| 568 | + bucketsToLease.put(tb, snapshotIdOpt.getAsLong()); |
| 569 | + } else { |
| 570 | + ignoreBuckets.add(tb); |
591 | 571 | } |
592 | 572 |
|
593 | | - if (!ignoreBuckets.isEmpty()) { |
594 | | - remainingTableBuckets.removeAll(ignoreBuckets); |
595 | | - } |
| 573 | + snapshotIds.put( |
| 574 | + bucket, snapshotIdOpt.isPresent() ? snapshotIdOpt.getAsLong() : null); |
| 575 | + logOffsets.put(bucket, logOffsetOpt.isPresent() ? logOffsetOpt.getAsLong() : null); |
| 576 | + } |
596 | 577 |
|
597 | | - if (!bucketsToLease.isEmpty()) { |
598 | | - String kvSnapshotLeaseId = leaseContext.getKvSnapshotLeaseId(); |
| 578 | + if (!ignoreBuckets.isEmpty()) { |
| 579 | + remainingTableBuckets.removeAll(ignoreBuckets); |
| 580 | + } |
| 581 | + |
| 582 | + if (!bucketsToLease.isEmpty()) { |
| 583 | + String kvSnapshotLeaseId = leaseContext.getKvSnapshotLeaseId(); |
| 584 | + LOG.info( |
| 585 | + "Try to acquire kv snapshot lease {} for table {}", |
| 586 | + kvSnapshotLeaseId, |
| 587 | + tablePath); |
| 588 | + Long kvSnapshotLeaseDurationMs = leaseContext.getKvSnapshotLeaseDurationMs(); |
| 589 | + checkNotNull(kvSnapshotLeaseDurationMs, "kv snapshot lease duration is null."); |
| 590 | + remainingTableBuckets = |
| 591 | + flussAdmin |
| 592 | + .acquireKvSnapshotLease( |
| 593 | + kvSnapshotLeaseId, |
| 594 | + bucketsToLease, |
| 595 | + kvSnapshotLeaseDurationMs) |
| 596 | + .get() |
| 597 | + .getUnavailableTableBucketSet(); |
| 598 | + if (!remainingTableBuckets.isEmpty()) { |
599 | 599 | LOG.info( |
600 | | - "Try to acquire kv snapshot lease {} for table {}", |
601 | | - kvSnapshotLeaseId, |
602 | | - tablePath); |
603 | | - remainingTableBuckets = |
604 | | - flussAdmin |
605 | | - .acquireKvSnapshotLease( |
606 | | - kvSnapshotLeaseId, |
607 | | - bucketsToLease, |
608 | | - leaseContext.getKvSnapshotLeaseDurationMs()) |
609 | | - .get() |
610 | | - .getUnavailableTableBucketSet(); |
611 | | - if (!remainingTableBuckets.isEmpty()) { |
612 | | - LOG.info( |
613 | | - "Failed to acquire kv snapshot lease for table {}: {}. Retry to re-acquire", |
614 | | - tablePath, |
615 | | - remainingTableBuckets); |
616 | | - } |
| 600 | + "Failed to acquire kv snapshot lease for table {}: {}. Retry to re-acquire", |
| 601 | + tablePath, |
| 602 | + remainingTableBuckets); |
617 | 603 | } |
618 | | - } while (!remainingTableBuckets.isEmpty()); |
| 604 | + } |
619 | 605 | } catch (Exception e) { |
620 | 606 | throw new FlinkRuntimeException( |
621 | 607 | String.format("Failed to get table snapshot for %s", tablePath), |
@@ -1055,29 +1041,19 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { |
1055 | 1041 |
|
1056 | 1042 | /** Add bucket who has been consumed kv snapshot to the consumedKvSnapshotMap. */ |
1057 | 1043 | public void addConsumedBucket(long checkpointId, TableBucket tableBucket) { |
1058 | | - inWriteLock( |
1059 | | - lock, |
1060 | | - () -> { |
1061 | | - consumedKvSnapshotMap |
1062 | | - .computeIfAbsent(checkpointId, k -> new HashSet<>()) |
1063 | | - .add(tableBucket); |
1064 | | - }); |
| 1044 | + consumedKvSnapshotMap.computeIfAbsent(checkpointId, k -> new HashSet<>()).add(tableBucket); |
1065 | 1045 | } |
1066 | 1046 |
|
1067 | 1047 | /** Get and remove the buckets who have been consumed kv snapshot up to the checkpoint id. */ |
1068 | 1048 | public Set<TableBucket> getAndRemoveConsumedBucketsUpTo(long checkpointId) { |
1069 | | - return inWriteLock( |
1070 | | - lock, |
1071 | | - () -> { |
1072 | | - NavigableMap<Long, Set<TableBucket>> toRemove = |
1073 | | - consumedKvSnapshotMap.headMap(checkpointId, false); |
1074 | | - Set<TableBucket> result = new HashSet<>(); |
1075 | | - for (Set<TableBucket> snapshots : toRemove.values()) { |
1076 | | - result.addAll(snapshots); |
1077 | | - } |
1078 | | - toRemove.clear(); |
1079 | | - return result; |
1080 | | - }); |
| 1049 | + NavigableMap<Long, Set<TableBucket>> toRemove = |
| 1050 | + consumedKvSnapshotMap.headMap(checkpointId, false); |
| 1051 | + Set<TableBucket> result = new HashSet<>(); |
| 1052 | + for (Set<TableBucket> snapshots : toRemove.values()) { |
| 1053 | + result.addAll(snapshots); |
| 1054 | + } |
| 1055 | + toRemove.clear(); |
| 1056 | + return result; |
1081 | 1057 | } |
1082 | 1058 |
|
1083 | 1059 | @Override |
|
0 commit comments