|
18 | 18 |
|
19 | 19 | package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; |
20 | 20 |
|
21 | | -import static org.assertj.core.api.Assertions.assertThat; |
22 | | - |
23 | 21 | import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageDataIdentifier; |
| 22 | + |
24 | 23 | import org.junit.jupiter.api.AfterEach; |
25 | 24 | import org.junit.jupiter.api.BeforeEach; |
26 | 25 | import org.junit.jupiter.api.RepeatedTest; |
|
35 | 34 | import java.util.concurrent.TimeUnit; |
36 | 35 | import java.util.concurrent.atomic.AtomicInteger; |
37 | 36 |
|
| 37 | +import static org.assertj.core.api.Assertions.assertThat; |
| 38 | + |
38 | 39 | /** Tests for {@link TieredStorageResourceRegistry}. */ |
39 | 40 | class TieredStorageResourceRegistryTest { |
40 | 41 |
|
@@ -62,6 +63,30 @@ void tearDown() throws Exception { |
62 | 63 | executor.awaitTermination(10, TimeUnit.SECONDS); |
63 | 64 | } |
64 | 65 |
|
| 66 | + @RepeatedTest(10) |
| 67 | + void testConcurrentRegisterResource() throws Exception { |
| 68 | + AtomicInteger releaseCount = new AtomicInteger(0); |
| 69 | + TestingDataIdentifier sharedOwner = new TestingDataIdentifier(0); |
| 70 | + |
| 71 | + runConcurrentTask( |
| 72 | + threadId -> { |
| 73 | + for (int i = 0; i < NUM_OPERATIONS_PER_THREAD; i++) { |
| 74 | + registry.registerResource( |
| 75 | + sharedOwner, () -> releaseCount.incrementAndGet()); |
| 76 | + } |
| 77 | + }); |
| 78 | + |
| 79 | + assertNoExceptions("Concurrent registerResource() calls"); |
| 80 | + |
| 81 | + // Clear resources and verify all were registered |
| 82 | + registry.clearResourceFor(sharedOwner); |
| 83 | + |
| 84 | + // Verify that all registered resources were successfully release |
| 85 | + assertThat(releaseCount.get()) |
| 86 | + .as("All registered resources should be released.") |
| 87 | + .isEqualTo(NUM_THREADS * NUM_OPERATIONS_PER_THREAD); |
| 88 | + } |
| 89 | + |
65 | 90 | @RepeatedTest(10) |
66 | 91 | void testConcurrentRegisterResourceWithDifferentOwners() throws Exception { |
67 | 92 | AtomicInteger successfulRegistrations = new AtomicInteger(0); |
@@ -112,30 +137,6 @@ void testConcurrentRegisterAndClear() throws Exception { |
112 | 137 | assertNoExceptions("Concurrent registration/clearing calls"); |
113 | 138 | } |
114 | 139 |
|
115 | | - @RepeatedTest(10) |
116 | | - void testConcurrentRegisterResourceWithSameOwner() throws Exception { |
117 | | - AtomicInteger releaseCount = new AtomicInteger(0); |
118 | | - TestingDataIdentifier sharedOwner = new TestingDataIdentifier(0); |
119 | | - |
120 | | - runConcurrentTask( |
121 | | - threadId -> { |
122 | | - for (int i = 0; i < NUM_OPERATIONS_PER_THREAD; i++) { |
123 | | - registry.registerResource( |
124 | | - sharedOwner, () -> releaseCount.incrementAndGet()); |
125 | | - } |
126 | | - }); |
127 | | - |
128 | | - assertNoExceptions("concurrent registerResource() calls with same owner"); |
129 | | - |
130 | | - // Clear resources and verify all were registered |
131 | | - registry.clearResourceFor(sharedOwner); |
132 | | - |
133 | | - // Verify that all registered resources were successfully release |
134 | | - assertThat(releaseCount.get()) |
135 | | - .as("All registered resources should be released.") |
136 | | - .isEqualTo(NUM_THREADS * NUM_OPERATIONS_PER_THREAD); |
137 | | - } |
138 | | - |
139 | 140 | private void runConcurrentTask(ThrowingIntConsumer task) throws Exception { |
140 | 141 | for (int t = 0; t < NUM_THREADS; t++) { |
141 | 142 | final int threadId = t; |
|
0 commit comments