Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4c6e420
first try
kasakrisz Jun 2, 2025
c9a9ab5
add config and test
kasakrisz May 29, 2025
fba6f5c
create MV
kasakrisz Jun 18, 2025
f55e33d
create MV #2
kasakrisz Jun 19, 2025
6763401
checkstyle
kasakrisz Nov 12, 2025
97d43a7
checkstyle
kasakrisz Nov 12, 2025
f5b1d03
load Iceberg MV object
kasakrisz Nov 13, 2025
e29a5ce
commit
kasakrisz Nov 13, 2025
c88645f
add tests to mv_iceberg_orc9.q
kasakrisz Nov 14, 2025
67b60ed
Iceberg materialized view: Do not store view text info on HMS side
InvisibleProgrammer Dec 12, 2025
1f7934b
Added original text, fixed a failure at hive materialized views.
InvisibleProgrammer Dec 15, 2025
4ff990b
Fix issues after rebase to iceberg upgrade
InvisibleProgrammer Jan 9, 2026
5104b09
Add view property: max-staleness-ms
InvisibleProgrammer Jan 19, 2026
653a89c
Refactor storing view definition logic
InvisibleProgrammer Jan 19, 2026
110474d
Add source table metadata
InvisibleProgrammer Jan 23, 2026
a675076
Spotless
zsmiskolczi Jan 27, 2026
8925873
Drop materialized view
zsmiskolczi Jan 30, 2026
c0481dd
Implement show materialized views
zsmiskolczi Feb 2, 2026
071c8cc
Mask out random values from test output
zsmiskolczi Feb 2, 2026
ac971e9
Test out partitioned materialized views, fix upstream build
zsmiskolczi Feb 3, 2026
ab0d1d6
save storage table to a different location
kasakrisz Feb 9, 2026
079e493
Add materialized view related fields to metadata
zsmiskolczi Feb 17, 2026
18a89ff
Fix typo
zsmiskolczi Feb 17, 2026
5c898ab
Fix issues around drop materialized view
zsmiskolczi Feb 26, 2026
3755454
Introduce new table type: EXTERNAL_MATERIALIZED_VIEW
zsmiskolczi Mar 5, 2026
19194f8
Fix calculating statistics
zsmiskolczi Mar 12, 2026
a2960e8
Fix compile errors after rebase
zsmiskolczi Mar 17, 2026
a441fd3
WIP: handle outdated materialized views
zsmiskolczi Mar 27, 2026
cc9222d
Fix failing tests - I
zsmiskolczi Apr 4, 2026
c669a77
Attempt to fix upstream compile error
zsmiskolczi Apr 4, 2026
d3644f2
Fix test failures
zsmiskolczi Apr 16, 2026
6b8ebdd
Move mv test to llap tests
zsmiskolczi Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,9 @@ public static enum ConfVars {
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),
HIVE_ICEBERG_MATERIALIZEDVIEW_METADATA_LOCATION("hive.iceberg.materializedview.metadata.location", "metastore",
new StringSet("metastore", "iceberg"),
"Location of materialized view metadata stored by iceberg"),
HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveCatalog extends BaseMetastoreViewCatalog
implements SupportsNamespaces, Configurable {
public class HiveCatalog extends BaseMetastoreViewCatalog implements SupportsNamespaces, Configurable {

public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.slf4j.LoggerFactory;

/** All the HMS operations like table,view,materialized_view should implement this. */
interface HiveOperationsBase {
public interface HiveOperationsBase {

Logger LOG = LoggerFactory.getLogger(HiveOperationsBase.class);
// The max size is based on HMS backend database. For Hive versions below 2.3, the max table
Expand Down Expand Up @@ -112,15 +112,47 @@ private static boolean isValidIcebergTable(Table table) {
}

static void validateTableIsIceberg(Table table, String fullName) {
String tableType = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
NoSuchIcebergTableException.check(
isValidIcebergTable(table), "Not an iceberg table: %s (type=%s)", fullName, tableType);
(TableType.MANAGED_TABLE.name().equalsIgnoreCase(table.getTableType()) ||
TableType.EXTERNAL_TABLE.name().equalsIgnoreCase(table.getTableType())) &&
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(tableTypeProp),
"Not an iceberg table: %s (type=%s) (tableType=%s)",
fullName,
tableTypeProp,
table.getTableType());
}

static void validateTableOrMVIsIceberg(Table table, String fullName) {
String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
NoSuchIcebergTableException.check(
(TableType.MANAGED_TABLE.name().equalsIgnoreCase(table.getTableType()) ||
TableType.EXTERNAL_TABLE.name().equalsIgnoreCase(table.getTableType())) &&
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(tableTypeProp) ||
(TableType.MATERIALIZED_VIEW.name().equalsIgnoreCase(table.getTableType()) ||
TableType.EXTERNAL_MATERIALIZED_VIEW.name().equalsIgnoreCase(table.getTableType())) &&
ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp),
"Not an iceberg table: %s (type=%s) (tableType=%s)",
fullName,
tableTypeProp,
table.getTableType());

/*
table tbl_ice
tableTypeProp ICEBERG-VIEW
table.getTableType() EXTERNAL_TABLE


*/
}

static void validateTableIsIcebergView(Table table, String fullName) {
String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);

NoSuchIcebergViewException.check(
isValidIcebergView(table),
(TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType()) ||
TableType.EXTERNAL_MATERIALIZED_VIEW.name().equalsIgnoreCase(table.getTableType())) &&
ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp),
"Not an iceberg view: %s (type=%s) (tableType=%s)",
fullName,
tableTypeProp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
Expand Down Expand Up @@ -118,8 +119,10 @@ protected void doRefresh() {

// Check if we are trying to load an Iceberg View as a Table
HiveOperationsBase.validateIcebergViewNotLoadedAsIcebergTable(table, fullName);
// Check if it is a valid Iceberg Table
HiveOperationsBase.validateTableIsIceberg(table, fullName);

if ("iceberg".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_MATERIALIZEDVIEW_METADATA_LOCATION))) {
HiveOperationsBase.validateTableOrMVIsIceberg(table, fullName);
}

metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,39 @@

package org.apache.iceberg.mr;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CreationMetadata;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HMSTablePropertyHelper;
import org.apache.iceberg.hive.IcebergCatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.view.ImmutableRefreshState;
import org.apache.iceberg.view.ImmutableSourceState;
import org.apache.iceberg.view.RefreshState;
import org.apache.iceberg.view.SourceState;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewVersion;

/**
* Class for catalog resolution and accessing the common functions for {@link Catalog} API.
Expand Down Expand Up @@ -76,6 +89,16 @@ public final class Catalogs {
ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME,
InputFormatConfig.CATALOG_NAME);

public static final String MATERIALIZED_VIEW_PROPERTY_KEY = "iceberg.materialized.view";
public static final String MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY =
"iceberg.materialized.view.storage.table";
public static final String MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX =
"iceberg.base.snapshot.";
public static final String MATERIALIZED_VIEW_VERSION_PROPERTY_KEY =
"iceberg.materialized.view.version";
public static final String MATERIALIZED_VIEW_ORIGINAL_TEXT = "iceberg.materialized.view.original.text";
public static final String MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX = "_storage_table";

private Catalogs() {
}

Expand Down Expand Up @@ -279,4 +302,136 @@ private static Map<String, String> filterIcebergTableProperties(Properties props
}
return map;
}

public static MaterializedView createMaterializedView(
Configuration conf, Properties props, String viewOriginalText, String viewExpandedText,
CreationMetadata creationMetadata) {

boolean isExternalMaterializedView = "iceberg".equals(
HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_MATERIALIZEDVIEW_METADATA_LOCATION));

Schema schema = schema(props);
PartitionSpec spec = spec(props, schema);
String location = props.getProperty(LOCATION);
String storageTableLocation = location +
(isExternalMaterializedView ? MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX : "");
String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);

Optional<Catalog> catalog = loadCatalog(conf, catalogName);
SortOrder sortOrder = HMSTablePropertyHelper.getSortOrder(props, schema);
if (catalog.isEmpty()) {
throw new IllegalStateException("Unable to load catalog: " + catalogName);
}
String name = props.getProperty(NAME);
Preconditions.checkNotNull(name, "Table identifier not set");

ViewCatalog viewCatalog = (ViewCatalog) catalog.get();

Map<String, String> map = filterIcebergTableProperties(props);
String storageTableIdentifier = name +
(isExternalMaterializedView ? MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX : "");
Table storageTable = catalog.get().buildTable(TableIdentifier.parse(storageTableIdentifier), schema)
.withPartitionSpec(spec).withLocation(storageTableLocation).withProperties(map).withSortOrder(sortOrder)
.create();

Map<String, String> viewProperties = Maps.newHashMapWithExpectedSize(2);
viewProperties.put(MATERIALIZED_VIEW_PROPERTY_KEY, "true");
viewProperties.put(MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY, storageTableIdentifier);
viewProperties.put(MATERIALIZED_VIEW_ORIGINAL_TEXT, viewOriginalText);

long createTime = System.currentTimeMillis();

List<SourceState> sourceStates = Lists.newArrayList();

for (var sourceTable : creationMetadata.getSourceTables()) {
SourceState.SourceStateType type = sourceTable.getTable().getViewExpandedText() == null ?
SourceState.SourceStateType.TABLE :
SourceState.SourceStateType.VIEW;

String dbName = sourceTable.getTable().getDbName();
String sourceTableName = sourceTable.getTable().getTableName();
String sourceTableNamespace = "default";
String sourceTableCatalog = sourceTable.getTable().isSetCatName() ? sourceTable.getTable().getCatName() : null;
Catalog tableCatalog = loadCatalog(conf, sourceTableCatalog).orElse(catalog.get());
UUID uuid = null;
Snapshot snapshot = null;
ViewVersion version = null;

switch (type) {
case TABLE -> {
Table icebergTable = tableCatalog.loadTable(TableIdentifier.parse(dbName + "." + sourceTableName));
uuid = icebergTable.uuid();
snapshot = icebergTable.currentSnapshot();

SourceState sourcestate = ImmutableSourceState.of(type, sourceTableName, sourceTableNamespace, catalogName,
uuid, snapshot == null ? null : snapshot.snapshotId(), null, null);
sourceStates.add(sourcestate);
}
case VIEW -> {
var icebergView = ((ViewCatalog) tableCatalog).loadView(TableIdentifier.parse(sourceTableName));
uuid = icebergView.uuid();
version = icebergView.currentVersion();

SourceState sourcestate = ImmutableSourceState.of(type, sourceTableName, sourceTableNamespace, catalogName,
uuid, null, null, version.versionId());
sourceStates.add(sourcestate);
}
}
}

RefreshState refreshState = ImmutableRefreshState.of(1, sourceStates, createTime);

TableIdentifier viewIdentifier = TableIdentifier.parse(name);
View mv = viewCatalog.buildView(viewIdentifier)
.withLocation(location)
.withDefaultNamespace(viewIdentifier.namespace())
.withQuery("hive", viewExpandedText)
.withSchema(schema)
.withProperties(viewProperties)
.withStorageTable(storageTableIdentifier)
.withRefreshState(refreshState)
.withCreateTime(createTime)
.create();

return new MaterializedView(mv, storageTable);
}

public static class MaterializedView {
private View view;
private Table storageTable;

public MaterializedView(View view, Table storageTable) {
this.view = view;
this.storageTable = storageTable;
}

public View getView() {
return view;
}

public Table getStorageTable() {
return storageTable;
}
}

public static MaterializedView loadMaterializedView(Configuration conf, Properties props) {
return loadMaterializedView(conf, props.getProperty(NAME), props.getProperty(LOCATION),
props.getProperty(InputFormatConfig.CATALOG_NAME));
}

public static MaterializedView loadMaterializedView(
Configuration conf, String tableIdentifier, String tableLocation, String catalogName) {
Optional<Catalog> catalog = loadCatalog(conf, catalogName);

if (catalog.isPresent()) {
Preconditions.checkArgument(tableIdentifier != null, "View identifier not set");
ViewCatalog viewCatalog = (ViewCatalog) catalog.get();
View view = viewCatalog.loadView(TableIdentifier.parse(tableIdentifier));
String storageTableIdentifier = view.properties().get(MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY);
Table storageTable = catalog.get().loadTable(TableIdentifier.parse(storageTableIdentifier));
return new MaterializedView(view, storageTable);
}

throw new UnsupportedOperationException("Catalog " + catalogName + " not found!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private InputFormatConfig() {
public static final String TABLE_LOCATION = "iceberg.mr.table.location";
public static final String TABLE_METADATA_LOCATION = "iceberg.mr.table.metadata.location";
public static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
public static final String TABLE_TYPE = "iceberg.mr.table.type";
public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";
public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table.";
public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog.";
Expand Down
Loading
Loading