Skip to content

Commit aed75a2

Browse files
committed
HIVE-29281: Make proactive cache eviction work with catalog
1 parent ef2c646 commit aed75a2

24 files changed

Lines changed: 441 additions & 165 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static ByteBuffer getSerializedOrcTail(Path path, SyntheticFileId fileId,
7979
// Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to
8080
// deduct the table (and DB) name here.
8181
CacheTag cacheTag = HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
82-
LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) : null;
82+
LlapHiveUtils.getCacheTag(path, true, partitionDesc) : null;
8383

8484
try {
8585
// Schema has to be serialized and deserialized as it is passed between different packages of TypeDescription:

llap-common/src/protobuf/LlapDaemonProtocol.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,11 @@ message SetCapacityRequestProto {
233233
message SetCapacityResponseProto {
234234
}
235235

236-
// Used for proactive eviction request. Must contain one DB name, and optionally table information.
236+
// Used for proactive eviction request. Must contain a DB name, and optionally table information and catalog name.
237237
message EvictEntityRequestProto {
238238
required string db_name = 1;
239239
repeated TableProto table = 2;
240+
optional string catalog_name = 3 [default = "hive"];
240241
}
241242

242243
// Used in EvictEntityRequestProto, can be used for non-partitioned and partitioned tables too.

llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hadoop.hive.llap.io.encoded.LlapOrcCacheLoader;
3434
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
3535
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
36+
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
3637
import org.apache.hive.common.util.FixedSizedObjectPool;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
@@ -149,7 +150,7 @@ public void loadData(LlapDaemonProtocolProtos.CacheEntryList data) {
149150
}
150151

151152
private void loadData(LlapDaemonProtocolProtos.CacheEntry ce) throws IOException {
152-
CacheTag cacheTag = decodeCacheTag(ce.getCacheTag());
153+
CacheTag cacheTag = decodeCacheTag(ce.getCacheTag(), conf);
153154
DiskRangeList ranges = decodeRanges(ce.getRangesList());
154155
Object fileKey = decodeFileKey(ce.getFileKey());
155156
try (LlapOrcCacheLoader llr = new LlapOrcCacheLoader(new Path(ce.getFilePath()), fileKey, conf, cache,
@@ -167,9 +168,16 @@ private static DiskRangeList decodeRanges(List<LlapDaemonProtocolProtos.CacheEnt
167168
return helper.get();
168169
}
169170

170-
private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct) {
171-
return ct.getPartitionDescCount() == 0 ? CacheTag.build(ct.getTableName()) : CacheTag
172-
.build(ct.getTableName(), ct.getPartitionDescList());
171+
private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct, Configuration conf) {
172+
String tableName = ct.getTableName();
173+
String[] parts = tableName.split("\\.");
174+
if (parts.length == 2) {
175+
// db.table without catalog, prepend current or default catalog
176+
tableName = HiveUtils.getCurrentCatalogOrDefault(conf) + '.' + tableName;
177+
}
178+
return ct.getPartitionDescCount() == 0
179+
? CacheTag.build(tableName)
180+
: CacheTag.build(tableName, ct.getPartitionDescList());
173181
}
174182

175183
@VisibleForTesting

llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import java.io.IOException;
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
24+
import java.util.LinkedHashMap;
2425
import java.util.List;
26+
import java.util.Map;
2527
import java.util.concurrent.ExecutorService;
2628
import java.util.concurrent.LinkedBlockingQueue;
2729
import java.util.concurrent.TimeUnit;
@@ -324,9 +326,11 @@ public long evictEntity(LlapDaemonProtocolProtos.EvictEntityRequestProto protoRe
324326
if (LOG.isDebugEnabled()) {
325327
StringBuilder sb = new StringBuilder();
326328
sb.append(markedBytes).append(" bytes marked for eviction from LLAP cache buffers that belong to table(s): ");
327-
for (String table : request.getEntities().get(request.getSingleDbName()).keySet()) {
328-
sb.append(table).append(" ");
329-
}
329+
request.getEntities().forEach((catalog, dbs) -> {
330+
dbs.forEach((db, tables) -> {
331+
tables.forEach((table, partitions) -> sb.append(catalog + "." + db + "." + table).append(" "));
332+
});
333+
});
330334
sb.append(" Duration: ").append(time).append(" ms");
331335
LOG.debug(sb.toString());
332336
}

llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff
235235
// LlapInputFormat needs to know the file schema to decide if schema evolution is supported.
236236
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
237237
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
238-
? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null;
238+
? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) : null;
239239
// 1. Get file metadata from cache, or create the reader and read it.
240240
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
241241
fsSupplier = getFsSupplier(split.getPath(), jobConf);

llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public MemoryBuffer create() {
225225
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
226226
fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf);
227227
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
228-
? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null;
228+
? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) : null;
229229
this.sourceInputFormat = sourceInputFormat;
230230
this.sourceSerDe = sourceSerDe;
231231
this.reporter = reporter;

llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
2727
import org.apache.hadoop.hive.common.io.CacheTag;
2828
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
29+
import org.apache.hadoop.hive.metastore.Warehouse;
2930
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
3031
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
3132
import org.apache.hadoop.hive.ql.io.orc.encoded.IncompleteCb;
@@ -140,7 +141,6 @@ public boolean isMarkedForEviction() {
140141

141142
@Override
142143
public CacheTag getTag() {
143-
// We don't care about these.
144-
return CacheTag.build("OrcEstimates");
144+
return CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "OrcEstimates");
145145
}
146146
}

llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hive.llap.cache;
1919

20-
import java.util.HashMap;
2120
import java.util.LinkedHashMap;
22-
import java.util.Map;
2321

2422
import org.apache.hadoop.hive.common.io.CacheTag;
23+
import org.apache.hadoop.hive.metastore.Warehouse;
2524

2625
import org.junit.BeforeClass;
2726
import org.junit.Test;
@@ -127,7 +126,7 @@ public void testCacheTagComparison() {
127126
public void testEncodingDecoding() throws Exception {
128127
LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
129128
partDescs.put("pytha=goras", "a2+b2=c2");
130-
CacheTag tag = CacheTag.build("math.rules", partDescs);
129+
CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".math.rules", partDescs);
131130
CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag);
132131
assertEquals("pytha=goras=a2+b2=c2", stag.partitionDescToString());
133132
assertEquals(1, stag.getPartitionDescMap().size());
@@ -136,7 +135,7 @@ public void testEncodingDecoding() throws Exception {
136135
partDescs.clear();
137136
partDescs.put("mutli=one", "one=/1");
138137
partDescs.put("mutli=two/", "two=2");
139-
tag = CacheTag.build("math.rules", partDescs);
138+
tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".math.rules", partDescs);
140139
CacheTag.MultiPartitionCacheTag mtag = ((CacheTag.MultiPartitionCacheTag)tag);
141140
assertEquals("mutli=one=one=/1/mutli=two/=two=2", mtag.partitionDescToString());
142141
assertEquals(2, mtag.getPartitionDescMap().size());
@@ -168,6 +167,10 @@ private static LlapCacheableBuffer createMockBuffer(long size, CacheTag cacheTag
168167
}
169168

170169
public static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
170+
String[] parts = dbAndTable.split("\\.");
171+
if(parts.length < 3) {
172+
dbAndTable = Warehouse.DEFAULT_CATALOG_NAME + "." + dbAndTable;
173+
}
171174
if (partitions != null && partitions.length > 0) {
172175
LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
173176
for (String partition : partitions) {
@@ -215,33 +218,33 @@ private static void evictSomeTestBuffers() {
215218
private static final String EXPECTED_CACHE_STATE_WHEN_FULL =
216219
"\n" +
217220
"Cache state: \n" +
218-
"default : 2/2, 2101248/2101248\n" +
219-
"default.testtable : 2/2, 2101248/2101248\n" +
220-
"otherdb : 7/7, 1611106304/1611106304\n" +
221-
"otherdb.testtable : 4/4, 231424/231424\n" +
222-
"otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
223-
"otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
224-
"otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
225-
"otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
226-
"otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
227-
"otherdb.testtable2 : 2/2, 537133056/537133056\n" +
228-
"otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
229-
"otherdb.testtable3 : 1/1, 1073741824/1073741824";
221+
"hive.default : 2/2, 2101248/2101248\n" +
222+
"hive.default.testtable : 2/2, 2101248/2101248\n" +
223+
"hive.otherdb : 7/7, 1611106304/1611106304\n" +
224+
"hive.otherdb.testtable : 4/4, 231424/231424\n" +
225+
"hive.otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
226+
"hive.otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
227+
"hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
228+
"hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
229+
"hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
230+
"hive.otherdb.testtable2 : 2/2, 537133056/537133056\n" +
231+
"hive.otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
232+
"hive.otherdb.testtable3 : 1/1, 1073741824/1073741824";
230233

231234
private static final String EXPECTED_CACHE_STATE_AFTER_EVICTION =
232235
"\n" +
233236
"Cache state: \n" +
234-
"default : 0/2, 0/2101248\n" +
235-
"default.testtable : 0/2, 0/2101248\n" +
236-
"otherdb : 5/7, 1074202624/1611106304\n" +
237-
"otherdb.testtable : 3/4, 198656/231424\n" +
238-
"otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
239-
"otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
240-
"otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
241-
"otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
242-
"otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
243-
"otherdb.testtable2 : 1/2, 262144/537133056\n" +
244-
"otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
245-
"otherdb.testtable3 : 1/1, 1073741824/1073741824";
237+
"hive.default : 0/2, 0/2101248\n" +
238+
"hive.default.testtable : 0/2, 0/2101248\n" +
239+
"hive.otherdb : 5/7, 1074202624/1611106304\n" +
240+
"hive.otherdb.testtable : 3/4, 198656/231424\n" +
241+
"hive.otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
242+
"hive.otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
243+
"hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
244+
"hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
245+
"hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
246+
"hive.otherdb.testtable2 : 1/2, 262144/537133056\n" +
247+
"hive.otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
248+
"hive.otherdb.testtable3 : 1/1, 1073741824/1073741824";
246249

247250
}

llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.base.Function;
2121
import org.apache.hadoop.hive.common.io.CacheTag;
22+
import org.apache.hadoop.hive.metastore.Warehouse;
2223
import org.junit.Test;
2324

2425
import java.util.concurrent.ConcurrentHashMap;
@@ -32,7 +33,7 @@ public void testFileCacheMetadata() {
3233
ConcurrentHashMap<Object, FileCache<Object>> cache = new ConcurrentHashMap<>();
3334
Object fileKey = 1234L;
3435
Function<Void, Object> f = a -> new Object();
35-
CacheTag tag = CacheTag.build("test_table");
36+
CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test_db.test_table");
3637

3738
FileCache<Object> result = FileCache.getOrAddFileSubCache(cache, fileKey, f, tag);
3839

llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.slf4j.LoggerFactory;
4040

4141
import org.apache.hadoop.hive.common.io.CacheTag;
42+
import org.apache.hadoop.hive.metastore.Warehouse;
4243
import org.apache.hadoop.hive.common.io.DiskRange;
4344
import org.apache.hadoop.hive.common.io.DiskRangeList;
4445
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
@@ -309,13 +310,14 @@ private void _testProactiveEvictionMark(boolean isInstantDeallocation) {
309310

310311
LlapDataBuffer[] buffs1 = IntStream.range(0, 4).mapToObj(i -> fb()).toArray(LlapDataBuffer[]::new);
311312
DiskRange[] drs1 = drs(IntStream.range(1, 5).toArray());
312-
CacheTag tag1 = CacheTag.build("default.table1");
313+
CacheTag tag1 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1");
313314

314315
LlapDataBuffer[] buffs2 = IntStream.range(0, 41).mapToObj(i -> fb()).toArray(LlapDataBuffer[]::new);
315316
DiskRange[] drs2 = drs(IntStream.range(1, 42).toArray());
316-
CacheTag tag2 = CacheTag.build("default.table2");
317+
CacheTag tag2 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table2");
317318

318-
Predicate<CacheTag> predicate = tag -> "default.table1".equals(tag.getTableName());
319+
Predicate<CacheTag> predicate = tag ->
320+
(Warehouse.DEFAULT_CATALOG_NAME + "." + "default.table1").equals(tag.getTableName());
319321

320322
cache.putFileData(fn1, drs1, buffs1, 0, Priority.NORMAL, null, tag1);
321323
cache.putFileData(fn2, drs2, buffs2, 0, Priority.NORMAL, null, tag2);

0 commit comments

Comments
 (0)