Skip to content

Commit da63109

Browse files
committed
address's Yang Wang's comments
1 parent c0255f5 commit da63109

11 files changed

Lines changed: 69 additions & 65 deletions

File tree

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,12 +452,12 @@ CompletableFuture<Void> releaseKvSnapshotLease(
452452
String leaseId, Set<TableBucket> bucketsToRelease);
453453

454454
/**
455-
* Drops the entire lease asynchronously.
455+
* Releases the entire lease asynchronously.
456456
*
457457
* <p>All snapshots locked under this {@code leaseId} will be released immediately. This is
458458
* equivalent to calling {@link #releaseKvSnapshotLease} with all held buckets.
459459
*
460-
* @param leaseId The lease id to drop.
460+
* @param leaseId The lease id to release.
461461
*/
462462
CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
463463

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.fluss.row.encode.KeyEncoder;
3838
import org.apache.fluss.server.zk.ZooKeeperClient;
3939
import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
40-
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataHelper;
40+
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
4141
import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
4242
import org.apache.fluss.types.DataTypes;
4343

@@ -205,8 +205,8 @@ public void testKvSnapshotLease() throws Exception {
205205

206206
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
207207
String remoteDataDir = FLUSS_CLUSTER_EXTENSION.getRemoteDataDir();
208-
KvSnapshotLeaseMetadataHelper metadataHelper =
209-
new KvSnapshotLeaseMetadataHelper(zkClient, remoteDataDir);
208+
KvSnapshotLeaseMetadataManager metadataHelper =
209+
new KvSnapshotLeaseMetadataManager(zkClient, remoteDataDir);
210210

211211
assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty();
212212

@@ -346,7 +346,7 @@ private void waitUntilAllSnapshotFinished(Set<TableBucket> tableBuckets, long sn
346346
}
347347

348348
private void checkKvSnapshotLeaseEquals(
349-
KvSnapshotLeaseMetadataHelper metadataHelper,
349+
KvSnapshotLeaseMetadataManager metadataHelper,
350350
String leaseId,
351351
long tableId,
352352
Long[] expectedBucketIndex)

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class MetricNames {
5454
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = "remoteKvSize";
5555

5656
// for kv snapshot lease.
57+
// TODO implemented it at the table level. Trace by: https://github.com/apache/fluss/issues/2297
5758
public static final String KV_SNAPSHOT_LEASE_COUNT = "kvSnapshotLeaseCount";
5859
public static final String LEASED_KV_SNAPSHOT_COUNT = "leasedKvSnapshotCount";
5960

fluss-common/src/test/java/org/apache/fluss/record/TestData.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ public final class TestData {
9797
currentMillis);
9898

9999
// for log table / partition table
100+
public static final TablePath PARTITION_TABLE_PATH =
101+
new TablePath("test_db_1", "test_partition_table");
102+
public static final long PARTITION_TABLE_ID = 150008L;
103+
100104
public static final TableDescriptor DATA1_PARTITIONED_TABLE_DESCRIPTOR =
101105
TableDescriptor.builder()
102106
.schema(DATA1_SCHEMA)
@@ -107,6 +111,16 @@ public final class TestData {
107111
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
108112
AutoPartitionTimeUnit.YEAR)
109113
.build();
114+
115+
public static final TableInfo PARTITION_TABLE_INFO =
116+
TableInfo.of(
117+
PARTITION_TABLE_PATH,
118+
PARTITION_TABLE_ID,
119+
1,
120+
DATA1_PARTITIONED_TABLE_DESCRIPTOR,
121+
System.currentTimeMillis(),
122+
System.currentTimeMillis());
123+
110124
public static final PhysicalTablePath DATA1_PHYSICAL_TABLE_PATH_PA_2024 =
111125
PhysicalTablePath.of(DATA1_TABLE_PATH, "2024");
112126

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.flink.table.annotation.ProcedureHint;
2323
import org.apache.flink.table.procedure.ProcedureContext;
2424

25-
/** Procedure to drop kv snapshot lease. */
25+
/** Procedure to release kv snapshot lease. */
2626
public class DropKvSnapshotLeaseProcedure extends ProcedureBase {
2727

2828
@ProcedureHint(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
import org.apache.fluss.server.zk.data.lake.LakeTable;
112112
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
113113
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
114-
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataHelper;
114+
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
115115
import org.apache.fluss.utils.clock.Clock;
116116
import org.apache.fluss.utils.types.Tuple2;
117117

@@ -225,7 +225,7 @@ public CoordinatorEventProcessor(
225225
this.kvSnapshotLeaseManager =
226226
new KvSnapshotLeaseManager(
227227
conf,
228-
new KvSnapshotLeaseMetadataHelper(zooKeeperClient, remoteDataDir),
228+
new KvSnapshotLeaseMetadataManager(zooKeeperClient, remoteDataDir),
229229
coordinatorContext,
230230
clock,
231231
coordinatorMetricGroup);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.fluss.metrics.MetricNames;
2727
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
2828
import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
29-
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataHelper;
29+
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
3030
import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
3131
import org.apache.fluss.utils.MapUtils;
3232
import org.apache.fluss.utils.clock.Clock;
@@ -48,7 +48,6 @@
4848
import java.util.concurrent.atomic.AtomicInteger;
4949
import java.util.concurrent.locks.ReadWriteLock;
5050
import java.util.concurrent.locks.ReentrantReadWriteLock;
51-
import java.util.stream.Collectors;
5251

5352
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
5453
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
@@ -57,8 +56,9 @@
5756
@ThreadSafe
5857
public class KvSnapshotLeaseManager {
5958
private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
59+
private static final AtomicInteger ZERO = new AtomicInteger(0);
6060

61-
private final KvSnapshotLeaseMetadataHelper metadataHelper;
61+
private final KvSnapshotLeaseMetadataManager metadataHelper;
6262
private final CoordinatorContext coordinatorContext;
6363
private final Clock clock;
6464
private final ScheduledExecutorService scheduledExecutor;
@@ -75,7 +75,6 @@ public class KvSnapshotLeaseManager {
7575
* KvSnapshotLeaseForBucket to the ref count, which means this table bucket + snapshotId has
7676
* been leased by how many lease id.
7777
*/
78-
@GuardedBy("refCountLock")
7978
private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
8079
MapUtils.newConcurrentHashMap();
8180

@@ -84,7 +83,7 @@ public class KvSnapshotLeaseManager {
8483

8584
public KvSnapshotLeaseManager(
8685
Configuration conf,
87-
KvSnapshotLeaseMetadataHelper metadataHelper,
86+
KvSnapshotLeaseMetadataManager metadataHelper,
8887
CoordinatorContext coordinatorContext,
8988
Clock clock,
9089
CoordinatorMetricGroup coordinatorMetricGroup) {
@@ -101,7 +100,7 @@ public KvSnapshotLeaseManager(
101100
@VisibleForTesting
102101
public KvSnapshotLeaseManager(
103102
Configuration conf,
104-
KvSnapshotLeaseMetadataHelper metadataHelper,
103+
KvSnapshotLeaseMetadataManager metadataHelper,
105104
CoordinatorContext coordinatorContext,
106105
ScheduledExecutorService scheduledExecutor,
107106
Clock clock,
@@ -126,8 +125,7 @@ public void start() {
126125
}
127126

128127
public void initialize() throws Exception {
129-
List<String> leaseIds = metadataHelper.getLeasesList();
130-
for (String leaseId : leaseIds) {
128+
for (String leaseId : metadataHelper.getLeasesList()) {
131129
Optional<KvSnapshotLease> kvSnapshotLeaseOpt = metadataHelper.getLease(leaseId);
132130
if (kvSnapshotLeaseOpt.isPresent()) {
133131
KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
@@ -145,8 +143,8 @@ public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket kvSnapshotLeaseFor
145143
return inReadLock(
146144
refCountLock,
147145
() ->
148-
!refCount.containsKey(kvSnapshotLeaseForBucket)
149-
|| refCount.get(kvSnapshotLeaseForBucket).get() <= 0);
146+
refCount.getOrDefault(kvSnapshotLeaseForBucket, new AtomicInteger(0)).get()
147+
<= 0);
150148
}
151149

152150
/**
@@ -402,18 +400,17 @@ private void decrementRefCount(KvSnapshotLeaseForBucket kvSnapshotLeaseForBucket
402400

403401
private void expireLeases() {
404402
long currentTime = clock.milliseconds();
405-
List<String> expiredLeases =
406-
kvSnapshotLeaseMap.entrySet().stream()
407-
.filter(entry -> entry.getValue().getExpirationTime() < currentTime)
408-
.map(Map.Entry::getKey)
409-
.collect(Collectors.toList());
410-
for (String leaseId : expiredLeases) {
411-
try {
412-
drop(leaseId);
413-
} catch (Exception e) {
414-
LOG.error("Failed to clear kv snapshot lease {}", leaseId, e);
415-
}
416-
}
403+
kvSnapshotLeaseMap.entrySet().stream()
404+
.filter(entry -> entry.getValue().getExpirationTime() < currentTime)
405+
.map(Map.Entry::getKey)
406+
.forEach(
407+
leaseId -> {
408+
try {
409+
drop(leaseId);
410+
} catch (Exception e) {
411+
LOG.error("Failed to clear kv snapshot lease {}", leaseId, e);
412+
}
413+
});
417414
}
418415

419416
private void registerMetrics(CoordinatorMetricGroup coordinatorMetricGroup) {
@@ -436,14 +433,7 @@ int getLeasedBucketCount() {
436433
@VisibleForTesting
437434
int getRefCount(KvSnapshotLeaseForBucket kvSnapshotLeaseForBucket) {
438435
return inReadLock(
439-
refCountLock,
440-
() -> {
441-
if (!refCount.containsKey(kvSnapshotLeaseForBucket)) {
442-
return 0;
443-
} else {
444-
return refCount.get(kvSnapshotLeaseForBucket).get();
445-
}
446-
});
436+
refCountLock, () -> refCount.getOrDefault(kvSnapshotLeaseForBucket, ZERO).get());
447437
}
448438

449439
@VisibleForTesting

fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLeaseMetadataHelper.java renamed to fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLeaseMetadataManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,16 @@
4141
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4242

4343
/**
44-
* The helper to handle {@link KvSnapshotLease} to register/update/delete metadata from zk and
44+
* The manager to handle {@link KvSnapshotLease} to register/update/delete metadata from zk and
4545
* remote fs.
4646
*/
47-
public class KvSnapshotLeaseMetadataHelper {
48-
private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotLeaseMetadataHelper.class);
47+
public class KvSnapshotLeaseMetadataManager {
48+
private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotLeaseMetadataManager.class);
4949

5050
private final ZooKeeperClient zkClient;
5151
private final String remoteDataDir;
5252

53-
public KvSnapshotLeaseMetadataHelper(ZooKeeperClient zkClient, String remoteDataDir) {
53+
public KvSnapshotLeaseMetadataManager(ZooKeeperClient zkClient, String remoteDataDir) {
5454
this.zkClient = zkClient;
5555
this.remoteDataDir = remoteDataDir;
5656
}

fluss-server/src/test/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManagerTest.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@
2222
import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
2323
import org.apache.fluss.metadata.PhysicalTablePath;
2424
import org.apache.fluss.metadata.TableBucket;
25-
import org.apache.fluss.metadata.TableInfo;
26-
import org.apache.fluss.metadata.TablePath;
2725
import org.apache.fluss.server.metrics.group.TestingMetricGroups;
2826
import org.apache.fluss.server.zk.NOPErrorHandler;
2927
import org.apache.fluss.server.zk.ZooKeeperClient;
3028
import org.apache.fluss.server.zk.ZooKeeperExtension;
3129
import org.apache.fluss.server.zk.data.BucketSnapshot;
3230
import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
33-
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataHelper;
31+
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
3432
import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
3533
import org.apache.fluss.testutils.common.AllCallbackWrapper;
3634
import org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService;
@@ -52,10 +50,12 @@
5250
import java.util.Map;
5351
import java.util.concurrent.TimeUnit;
5452

55-
import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
5653
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
5754
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
5855
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
56+
import static org.apache.fluss.record.TestData.PARTITION_TABLE_ID;
57+
import static org.apache.fluss.record.TestData.PARTITION_TABLE_INFO;
58+
import static org.apache.fluss.record.TestData.PARTITION_TABLE_PATH;
5959
import static org.apache.fluss.server.coordinator.CoordinatorTestUtils.createServers;
6060
import static org.assertj.core.api.Assertions.assertThat;
6161

@@ -66,17 +66,6 @@ public class KvSnapshotLeaseManagerTest {
6666
public static final AllCallbackWrapper<ZooKeeperExtension> ZOO_KEEPER_EXTENSION_WRAPPER =
6767
new AllCallbackWrapper<>(new ZooKeeperExtension());
6868

69-
private static final TablePath PARTITION_TABLE_PATH =
70-
new TablePath("test_db_1", "test_partition_table");
71-
private static final long PARTITION_TABLE_ID = 150008L;
72-
private static final TableInfo PARTITION_TABLE_INFO =
73-
TableInfo.of(
74-
PARTITION_TABLE_PATH,
75-
PARTITION_TABLE_ID,
76-
1,
77-
DATA1_PARTITIONED_TABLE_DESCRIPTOR,
78-
System.currentTimeMillis(),
79-
System.currentTimeMillis());
8069
private static final long PARTITION_ID_1 = 19001L;
8170
private static final PhysicalTablePath PARTITION_TABLE_PATH_1 =
8271
PhysicalTablePath.of(PARTITION_TABLE_PATH, "2024");
@@ -101,7 +90,7 @@ public class KvSnapshotLeaseManagerTest {
10190
private ManualClock manualClock;
10291
private ManuallyTriggeredScheduledExecutorService clearLeaseScheduler;
10392
private KvSnapshotLeaseManager kvSnapshotLeaseManager;
104-
private KvSnapshotLeaseMetadataHelper metadataHelper;
93+
private KvSnapshotLeaseMetadataManager metadataHelper;
10594

10695
private @TempDir Path tempDir;
10796

@@ -121,7 +110,7 @@ void beforeEach() throws Exception {
121110
conf.set(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL, Duration.ofDays(7));
122111
manualClock = new ManualClock(System.currentTimeMillis());
123112
clearLeaseScheduler = new ManuallyTriggeredScheduledExecutorService();
124-
metadataHelper = new KvSnapshotLeaseMetadataHelper(zookeeperClient, tempDir.toString());
113+
metadataHelper = new KvSnapshotLeaseMetadataManager(zookeeperClient, tempDir.toString());
125114
kvSnapshotLeaseManager =
126115
new KvSnapshotLeaseManager(
127116
conf,

fluss-server/src/test/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLeaseMetadataHelperTest.java renamed to fluss-server/src/test/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLeaseMetadataManagerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,16 @@
4141

4242
import static org.assertj.core.api.Assertions.assertThat;
4343

44-
/** Test for {@link KvSnapshotLeaseMetadataHelper}. */
45-
public class KvSnapshotLeaseMetadataHelperTest {
44+
/** Test for {@link KvSnapshotLeaseMetadataManager}. */
45+
public class KvSnapshotLeaseMetadataManagerTest {
4646

4747
@RegisterExtension
4848
public static final AllCallbackWrapper<ZooKeeperExtension> ZOO_KEEPER_EXTENSION_WRAPPER =
4949
new AllCallbackWrapper<>(new ZooKeeperExtension());
5050

5151
protected static ZooKeeperClient zookeeperClient;
5252
private @TempDir Path tempDir;
53-
private KvSnapshotLeaseMetadataHelper metadataHelper;
53+
private KvSnapshotLeaseMetadataManager metadataHelper;
5454

5555
@BeforeAll
5656
static void beforeAll() {
@@ -62,7 +62,7 @@ static void beforeAll() {
6262

6363
@BeforeEach
6464
void beforeEach() {
65-
metadataHelper = new KvSnapshotLeaseMetadataHelper(zookeeperClient, tempDir.toString());
65+
metadataHelper = new KvSnapshotLeaseMetadataManager(zookeeperClient, tempDir.toString());
6666
}
6767

6868
@AfterEach

0 commit comments

Comments
 (0)