Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ target
/.envrc
*.credentials
.DS_Store
/examples/docker-compose/data
devbox.lock
/var/cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
set -e

echo "Running delete partition test..."

# Create namespace
{{ICE_CLI}} --config {{CLI_CONFIG}} create-namespace ${NAMESPACE_NAME}
echo "OK Created namespace: ${NAMESPACE_NAME}"

# Get the full path to the input file
SCENARIO_DIR="{{SCENARIO_DIR}}"
INPUT_PATH="${SCENARIO_DIR}/${INPUT_FILE}"

# Create table with partitioning and insert data
{{ICE_CLI}} --config {{CLI_CONFIG}} insert --create-table ${TABLE_NAME} ${INPUT_PATH} --partition="${PARTITION_SPEC}"
echo "OK Inserted data with partitioning into table ${TABLE_NAME}"

# Delete partition for a specific day
output=$({{ICE_CLI}} --config {{CLI_CONFIG}} delete ${TABLE_NAME} --partition "[{\"name\": \"${TRANSFORMED_PARTITION_COLUMN}\", \"values\": [\"${DELETE_PARTITION_DAY}\"]}]" 2>&1)
Comment thread
xieandrew marked this conversation as resolved.
Outdated

echo "$output"

line_count=$(echo "$output" | wc -l)

if [ "$line_count" -ne 1 ]; then
echo "FAIL Expected 1 line, got $line_count"
exit 1
fi

if ! echo "$output" | grep -qF "${TRANSFORMED_PARTITION_COLUMN}=${DELETE_PARTITION_DAY}"; then
echo "FAIL Expected output to contain '${TRANSFORMED_PARTITION_COLUMN}=${DELETE_PARTITION_DAY}'"
exit 1
fi

echo "OK Deleted partition with value ${DELETE_PARTITION_DAY}"

# Cleanup
{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_NAME}
echo "OK Deleted table: ${TABLE_NAME}"

{{ICE_CLI}} --config {{CLI_CONFIG}} delete-namespace ${NAMESPACE_NAME}
echo "OK Deleted namespace: ${NAMESPACE_NAME}"

echo "Delete partition test completed successfully"
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: Delete partition
description: Tests deleting individual partitions by value

catalogConfig:
warehouse: s3://test-bucket/warehouse

env:
NAMESPACE_NAME: test_delete_partition
TABLE_NAME: test_delete_partition.tripdata_p_by_day
INPUT_FILE: tripdata.parquet
PARTITION_SPEC: '[{"column":"tpep_pickup_datetime","transform":"day"}]'
PARTITION_COLUMN: tpep_pickup_datetime
TRANSFORMED_PARTITION_COLUMN: tpep_pickup_datetime_day
DELETE_PARTITION_DAY: 2025-01-02
Binary file not shown.
73 changes: 55 additions & 18 deletions ice/src/main/java/com/altinity/ice/cli/internal/cmd/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@
*/
package com.altinity.ice.cli.internal.cmd;

import com.altinity.ice.cli.Main.PartitionFilter;
import com.altinity.ice.cli.internal.iceberg.Partitioning;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,30 +41,57 @@ private Delete() {}
public static void run(
RESTCatalog catalog,
TableIdentifier tableId,
List<com.altinity.ice.cli.Main.PartitionFilter> partitions,
List<PartitionFilter> partitions,
boolean dryRun)
throws IOException, URISyntaxException {

Table table = catalog.loadTable(tableId);
TableScan scan = table.newScan();
if (partitions != null && !partitions.isEmpty()) {
org.apache.iceberg.expressions.Expression expr = null;
for (com.altinity.ice.cli.Main.PartitionFilter pf : partitions) {
org.apache.iceberg.expressions.Expression e = null;

Snapshot currentSnapshot = table.currentSnapshot();
if (currentSnapshot == null) {
logger.error("There are no snapshots in this table");
return;
}

FileIO io = table.io();
Map<Integer, PartitionSpec> specsById = table.specs();

List<ManifestFile> dataManifests = currentSnapshot.dataManifests(io);
List<DataFile> filesToDelete = new ArrayList<>();

Expression expression = null;

if (partitions != null) {
for (PartitionFilter pf : partitions) {
String fieldName = pf.name();

Expression fieldExpr = null;
for (Object value : pf.values()) {
org.apache.iceberg.expressions.Expression valueExpr =
org.apache.iceberg.expressions.Expressions.equal(pf.name(), value);
e = (e == null) ? valueExpr : org.apache.iceberg.expressions.Expressions.or(e, valueExpr);
Integer transformed = Partitioning.applyTimestampTransform(table, fieldName, value);
if (transformed != null) {
value = transformed;
}

Expression singleValueExpr = Expressions.equal(fieldName, value);
fieldExpr =
fieldExpr == null ? singleValueExpr : Expressions.or(fieldExpr, singleValueExpr);
}
if (fieldExpr == null) {
continue;
}
expr = (expr == null) ? e : org.apache.iceberg.expressions.Expressions.and(expr, e);
expression = expression == null ? fieldExpr : Expressions.and(expression, fieldExpr);
}
scan = scan.filter(expr);
}
List<DataFile> filesToDelete = new ArrayList<>();

try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
for (FileScanTask task : tasks) {
filesToDelete.add(task.file());
for (ManifestFile manifest : dataManifests) {
ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io, specsById);
if (expression != null) {
reader.filterPartitions(expression);
}
try (reader) {
for (DataFile dataFile : reader) {
filesToDelete.add(dataFile);
}
}
}

Expand All @@ -73,6 +108,8 @@ public static void run(
}
rewrite.commit();
}
} else {
logger.info("No files to delete");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -28,6 +31,7 @@
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
Expand All @@ -38,6 +42,7 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand All @@ -50,6 +55,23 @@ public final class Partitioning {

private Partitioning() {}

// Formatter with optional time component (2025-01-01 or 2025-01-01T00:00:00)
private static final DateTimeFormatter DATE_TIME_INPUT_FORMATTER =
new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral("T")
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalEnd()
.optionalStart()
.appendOffsetId()
.optionalEnd()
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
.toFormatter();

public record InferPartitionKeyResult(
@Nullable PartitionKey partitionKey, @Nullable String failureReason) {
public boolean success() {
Expand Down Expand Up @@ -392,4 +414,48 @@ public static long toEpochMicros(Object tsValue) {
throw new UnsupportedOperationException("unexpected value type: " + tsValue.getClass());
}
}

/**
* Converts a datetime string input value to the table's partition transform unit if it is a
* timestamp transform. The transformed value is the Iceberg internal representation (e.g. days
* since Unix epoch).
*
* @return The timestamp converted to the partition unit as an integer, or null if not
* convertible.
*/
@Nullable
public static Integer applyTimestampTransform(Table table, String fieldName, Object value) {
PartitionField partitionField = getPartitionField(table, fieldName);
if (partitionField == null) return null;

Transform<?, ?> transform = partitionField.transform();
if (transform.isIdentity() || !(value instanceof String s)) {
return null;
}
if (s.isEmpty()) {
return null;
}

Type sourceType = table.schema().findType(partitionField.sourceId());
if (!(sourceType instanceof Types.TimestampType)) {
return null;
}

long timestampMicros = toEpochMicros(LocalDateTime.parse(s, DATE_TIME_INPUT_FORMATTER));

@SuppressWarnings("unchecked")
Transform<Long, Integer> typedTransform = (Transform<Long, Integer>) transform;

return typedTransform.bind(sourceType).apply(timestampMicros);
}

@Nullable
private static PartitionField getPartitionField(Table table, String fieldName) {
for (PartitionField field : table.spec().fields()) {
if (field.name().equals(fieldName)) {
return field;
}
}
return null;
}
}
Loading