Skip to content

Commit 220bddf

Browse files
committed
HIVE-29281: Make proactive cache eviction work with catalog
1 parent d55885e commit 220bddf

35 files changed

Lines changed: 480 additions & 208 deletions

File tree

hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ public static Pair<String, String> getDbAndTableName(String tableName) throws IO
423423
Properties props = inputJobInfo.getTableInfo().getStorerInfo().getProperties();
424424
props.put(serdeConstants.SERIALIZATION_LIB,storageHandler.getSerDeClass().getName());
425425
TableDesc tableDesc = new TableDesc(storageHandler.getInputFormatClass(),
426-
storageHandler.getOutputFormatClass(),props);
426+
storageHandler.getOutputFormatClass(), props, null);
427427
if (tableDesc.getJobProperties() == null) {
428428
tableDesc.setJobProperties(new HashMap<>());
429429
}
@@ -464,7 +464,7 @@ public static Pair<String, String> getDbAndTableName(String tableName) throws IO
464464
Properties props = outputJobInfo.getTableInfo().getStorerInfo().getProperties();
465465
props.put(serdeConstants.SERIALIZATION_LIB,storageHandler.getSerDeClass().getName());
466466
TableDesc tableDesc = new TableDesc(storageHandler.getInputFormatClass(),
467-
IgnoreKeyTextOutputFormat.class,props);
467+
IgnoreKeyTextOutputFormat.class, props, null);
468468
if (tableDesc.getJobProperties() == null)
469469
tableDesc.setJobProperties(new HashMap<>());
470470
for (Map.Entry<String, String> el : conf) {

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static ByteBuffer getSerializedOrcTail(Path path, SyntheticFileId fileId,
7979
// Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to
8080
// deduct the table (and DB) name here.
8181
CacheTag cacheTag = HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
82-
LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) : null;
82+
LlapHiveUtils.getDbAndTableNameForMetrics(partitionDesc.getCatalogName(), path, true, partitionDesc) : null;
8383

8484
try {
8585
// Schema has to be serialized and deserialized as it is passed between different packages of TypeDescription:

itests/hive-jmh/src/main/java/org/apache/hive/benchmark/ql/exec/KryoBench.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public static MapWork mockMapWork(String tableName, int partitions,
123123
tblProps.put("serialization.lib", OrcSerde.class.getName());
124124
tblProps.put("columns", columnNames.toString());
125125
tblProps.put("columns.types", columnTypes.toString());
126-
TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class, tblProps);
126+
TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class, tblProps, null);
127127

128128
MapWork mapWork = new MapWork();
129129
mapWork.setVectorMode(true);

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaDagCredentialSupplierTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private static MapWork createFileSinkWork(TableDesc tableDesc) {
142142

143143
private static TableDesc createKafkaDesc(Properties props) {
144144
props.setProperty("name", "kafka_table_fake");
145-
return new TableDesc(KafkaInputFormat.class, KafkaOutputFormat.class, props);
145+
return new TableDesc(KafkaInputFormat.class, KafkaOutputFormat.class, props, null);
146146
}
147147

148148
/**

llap-common/src/protobuf/LlapDaemonProtocol.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,11 @@ message SetCapacityRequestProto {
233233
message SetCapacityResponseProto {
234234
}
235235

236-
// Used for proactive eviction request. Must contain one DB name, and optionally table information.
236+
// Used for proactive eviction request. Must contain a DB name, catalog name, and optionally table information.
237237
message EvictEntityRequestProto {
238238
required string db_name = 1;
239239
repeated TableProto table = 2;
240+
required string catalog_name = 3;
240241
}
241242

242243
// Used in EvictEntityRequestProto, can be used for non-partitioned and partitioned tables too.

llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.hive.llap.cache.PathCache;
3232
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
3333
import org.apache.hadoop.hive.llap.io.encoded.LlapOrcCacheLoader;
34+
import org.apache.hadoop.hive.metastore.Warehouse;
3435
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
3536
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
3637
import org.apache.hive.common.util.FixedSizedObjectPool;
@@ -168,8 +169,14 @@ private static DiskRangeList decodeRanges(List<LlapDaemonProtocolProtos.CacheEnt
168169
}
169170

170171
private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct) {
171-
return ct.getPartitionDescCount() == 0 ? CacheTag.build(ct.getTableName()) : CacheTag
172-
.build(ct.getTableName(), ct.getPartitionDescList());
172+
String tableName = ct.getTableName();
173+
if (tableName.indexOf('.') == tableName.lastIndexOf('.')) {
174+
// In case the CacheTag's table name does not contain catalog name
175+
tableName = Warehouse.DEFAULT_CATALOG_NAME + '.' + tableName;
176+
}
177+
return ct.getPartitionDescCount() == 0
178+
? CacheTag.build(tableName)
179+
: CacheTag.build(tableName, ct.getPartitionDescList());
173180
}
174181

175182
@VisibleForTesting

llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,12 @@ public long evictEntity(LlapDaemonProtocolProtos.EvictEntityRequestProto protoRe
324324
if (LOG.isDebugEnabled()) {
325325
StringBuilder sb = new StringBuilder();
326326
sb.append(markedBytes).append(" bytes marked for eviction from LLAP cache buffers that belong to table(s): ");
327-
for (String table : request.getEntities().get(request.getSingleDbName()).keySet()) {
328-
sb.append(table).append(" ");
327+
String catalog = request.getSingleCatalogName();
328+
String db = request.getSingleDbName();
329+
if (catalog != null && db != null) {
330+
for (String table : request.getEntities().get(catalog).get(db).keySet()) {
331+
sb.append(table).append(" ");
332+
}
329333
}
330334
sb.append(" Duration: ").append(time).append(" ms");
331335
LOG.debug(sb.toString());

llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff
235235
// LlapInputFormat needs to know the file schema to decide if schema evolution is supported.
236236
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
237237
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
238-
? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null;
238+
? LlapHiveUtils.getDbAndTableNameForMetrics(partitionDesc.getCatalogName(), split.getPath(), true, partitionDesc) : null;
239239
// 1. Get file metadata from cache, or create the reader and read it.
240240
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
241241
fsSupplier = getFsSupplier(split.getPath(), jobConf);

llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public MemoryBuffer create() {
225225
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
226226
fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf);
227227
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
228-
? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null;
228+
? LlapHiveUtils.getDbAndTableNameForMetrics(partitionDesc.getCatalogName(), split.getPath(), true, partitionDesc) : null;
229229
this.sourceInputFormat = sourceInputFormat;
230230
this.sourceSerDe = sourceSerDe;
231231
this.reporter = reporter;

llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
2727
import org.apache.hadoop.hive.common.io.CacheTag;
2828
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
29+
import org.apache.hadoop.hive.metastore.Warehouse;
2930
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
3031
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
3132
import org.apache.hadoop.hive.ql.io.orc.encoded.IncompleteCb;
@@ -141,6 +142,6 @@ public boolean isMarkedForEviction() {
141142
@Override
142143
public CacheTag getTag() {
143144
// We don't care about these.
144-
return CacheTag.build("OrcEstimates");
145+
return CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".OrcEstimates");
145146
}
146147
}

0 commit comments

Comments
 (0)