Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/iceberg/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@

namespace iceberg {

/// \brief Iceberg table format versions
constexpr int8_t kFormatVersion1 = 1;
Comment thread
zhjwpku marked this conversation as resolved.
Outdated
constexpr int8_t kFormatVersion2 = 2;
constexpr int8_t kFormatVersion3 = 3;

constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
constexpr int64_t kInvalidSnapshotId = -1;
constexpr int64_t kInvalidSequenceNumber = -1;
Expand Down
28 changes: 15 additions & 13 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <nlohmann/json.hpp>

#include "iceberg/constants.h"
Comment thread
wgtmac marked this conversation as resolved.
#include "iceberg/json_serde_internal.h"
#include "iceberg/name_mapping.h"
#include "iceberg/partition_field.h"
Expand Down Expand Up @@ -837,7 +838,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
json[kFormatVersion] = table_metadata.format_version;
json[kTableUuid] = table_metadata.table_uuid;
json[kLocation] = table_metadata.location;
if (table_metadata.format_version > 1) {
if (table_metadata.format_version > kFormatVersion1) {
json[kLastSequenceNumber] = table_metadata.last_sequence_number;
}
json[kLastUpdatedMs] = UnixMsFromTimePointMs(table_metadata.last_updated_ms);
Expand All @@ -846,7 +847,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
// for older readers, continue writing the current schema as "schema".
// this is only needed for v1 because support for schemas and current-schema-id
// is required in v2 and later.
if (table_metadata.format_version == 1) {
if (table_metadata.format_version == kFormatVersion1) {
for (const auto& schema : table_metadata.schemas) {
if (schema->schema_id() == table_metadata.current_schema_id) {
json[kSchema] = ToJson(*schema);
Expand All @@ -860,7 +861,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
json[kSchemas] = ToJsonList(table_metadata.schemas);

// for older readers, continue writing the default spec as "partition-spec"
if (table_metadata.format_version == 1) {
if (table_metadata.format_version == kFormatVersion1) {
for (const auto& partition_spec : table_metadata.partition_specs) {
if (partition_spec->spec_id() == table_metadata.default_spec_id) {
json[kPartitionSpec] = ToJson(*partition_spec);
Expand Down Expand Up @@ -889,7 +890,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
json[kCurrentSnapshotId] = nlohmann::json::value_t::null;
}

if (table_metadata.format_version >= 3) {
if (table_metadata.format_version >= kFormatVersion3) {
json[kNextRowId] = table_metadata.next_row_id;
}

Expand Down Expand Up @@ -944,7 +945,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
current_schema_id, SafeDumpJson(schema_array));
}
} else {
if (format_version != 1) {
if (format_version != kFormatVersion1) {
return JsonParseError("{} must exist in format v{}", kSchemas, format_version);
}
ICEBERG_ASSIGN_OR_RAISE(auto schema_json,
Expand Down Expand Up @@ -982,7 +983,7 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
partition_specs.push_back(std::move(spec));
}
} else {
if (format_version != 1) {
if (format_version != kFormatVersion1) {
return JsonParseError("{} must exist in format v{}", kPartitionSpecs,
format_version);
}
Expand All @@ -998,8 +999,9 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
std::vector<PartitionField> fields;
for (const auto& entry_json : partition_spec_json) {
ICEBERG_ASSIGN_OR_RAISE(
auto field, PartitionFieldFromJson(
entry_json, /*allow_field_id_missing=*/format_version == 1));
auto field,
PartitionFieldFromJson(
entry_json, /*allow_field_id_missing=*/format_version == kFormatVersion1));
int32_t field_id = field->field_id();
if (field_id == SchemaField::kInvalidFieldId) {
// If the field ID is not set, we need to assign a new one
Expand Down Expand Up @@ -1043,7 +1045,7 @@ Status ParseSortOrders(const nlohmann::json& json, int8_t format_version,
sort_orders.push_back(std::move(sort_order));
}
} else {
if (format_version > 1) {
if (format_version > kFormatVersion1) {
return JsonParseError("{} must exist in format v{}", kSortOrders, format_version);
}
auto sort_order = SortOrder::Unsorted();
Expand All @@ -1065,7 +1067,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso

ICEBERG_ASSIGN_OR_RAISE(table_metadata->format_version,
GetJsonValue<int8_t>(json, kFormatVersion));
if (table_metadata->format_version < 1 ||
if (table_metadata->format_version < kFormatVersion1 ||
table_metadata->format_version > TableMetadata::kSupportedTableFormatVersion) {
return JsonParseError("Cannot read unsupported version: {}",
table_metadata->format_version);
Expand All @@ -1076,7 +1078,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
ICEBERG_ASSIGN_OR_RAISE(table_metadata->location,
GetJsonValue<std::string>(json, kLocation));

if (table_metadata->format_version > 1) {
if (table_metadata->format_version > kFormatVersion1) {
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_sequence_number,
GetJsonValue<int64_t>(json, kLastSequenceNumber));
} else {
Expand All @@ -1098,7 +1100,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_partition_id,
GetJsonValue<int32_t>(json, kLastPartitionId));
} else {
if (table_metadata->format_version > 1) {
if (table_metadata->format_version > kFormatVersion1) {
return JsonParseError("{} must exist in format v{}", kLastPartitionId,
table_metadata->format_version);
}
Expand Down Expand Up @@ -1128,7 +1130,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
table_metadata->current_snapshot_id,
GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId, kInvalidSnapshotId));

if (table_metadata->format_version >= 3) {
if (table_metadata->format_version >= kFormatVersion3) {
ICEBERG_ASSIGN_OR_RAISE(table_metadata->next_row_id,
GetJsonValue<int64_t>(json, kNextRowId));
} else {
Expand Down
222 changes: 59 additions & 163 deletions src/iceberg/manifest/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "iceberg/partition_summary_internal.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -272,87 +271,41 @@ Result<std::unique_ptr<Writer>> OpenFileWriter(
return writer;
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
}
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
int8_t format_version, std::optional<int64_t> snapshot_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
ManifestContent content, std::optional<int64_t> first_row_id) {
ICEBERG_PRECHECK(!manifest_location.empty(), "Manifest location cannot be empty");
ICEBERG_PRECHECK(file_io, "FileIO cannot be null");
ICEBERG_PRECHECK(partition_spec, "PartitionSpec cannot be null");
ICEBERG_PRECHECK(current_schema, "Current schema cannot be null");

auto adapter = std::make_unique<ManifestEntryAdapterV1>(
snapshot_id, std::move(partition_spec), std::move(current_schema));
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
std::unique_ptr<ManifestEntryAdapter> adapter;
std::optional<int64_t> writer_first_row_id = std::nullopt;

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::unique_ptr<ManifestWriter>(new ManifestWriter(
std::move(writer), std::move(adapter), manifest_location, std::nullopt));
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
switch (format_version) {
case kFormatVersion1: {
adapter = std::make_unique<ManifestEntryAdapterV1>(
snapshot_id, std::move(partition_spec), std::move(current_schema));
break;
}
case kFormatVersion2: {
adapter = std::make_unique<ManifestEntryAdapterV2>(
snapshot_id, std::move(partition_spec), std::move(current_schema), content);
break;
}
case kFormatVersion3: {
adapter = std::make_unique<ManifestEntryAdapterV3>(
snapshot_id, first_row_id, std::move(partition_spec), std::move(current_schema),
content);
writer_first_row_id = first_row_id;
break;
}
default:
return NotSupported("Format version {} is not supported", format_version);
}
auto adapter = std::make_unique<ManifestEntryAdapterV2>(
snapshot_id, std::move(partition_spec), std::move(current_schema), content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::unique_ptr<ManifestWriter>(new ManifestWriter(
std::move(writer), std::move(adapter), manifest_location, std::nullopt));
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
ManifestContent content) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
}
auto adapter = std::make_unique<ManifestEntryAdapterV3>(
snapshot_id, first_row_id, std::move(partition_spec), std::move(current_schema),
content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

Expand All @@ -362,28 +315,7 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::unique_ptr<ManifestWriter>(new ManifestWriter(
std::move(writer), std::move(adapter), manifest_location, first_row_id));
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
int8_t format_version, std::optional<int64_t> snapshot_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
ManifestContent content, std::optional<int64_t> first_row_id) {
switch (format_version) {
case 1:
return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec), std::move(current_schema));
case 2:
return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec), std::move(current_schema), content);
case 3:
return MakeV3Writer(snapshot_id, first_row_id, manifest_location,
std::move(file_io), std::move(partition_spec),
std::move(current_schema), content);
default:
return NotSupported("Format version {} is not supported", format_version);
}
std::move(writer), std::move(adapter), manifest_location, writer_first_row_id));
}

ManifestListWriter::ManifestListWriter(std::unique_ptr<Writer> writer,
Expand Down Expand Up @@ -420,83 +352,47 @@ std::optional<int64_t> ManifestListWriter::next_row_id() const {
return adapter_->next_row_id();
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
auto adapter = std::make_unique<ManifestFileAdapterV1>(snapshot_id, parent_snapshot_id);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));
return std::unique_ptr<ManifestListWriter>(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, std::string_view manifest_list_location,
std::shared_ptr<FileIO> file_io) {
auto adapter = std::make_unique<ManifestFileAdapterV2>(snapshot_id, parent_snapshot_id,
sequence_number);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));

return std::unique_ptr<ManifestListWriter>(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, int64_t first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
auto adapter = std::make_unique<ManifestFileAdapterV3>(snapshot_id, parent_snapshot_id,
sequence_number, first_row_id);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));
return std::unique_ptr<ManifestListWriter>(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeWriter(
int8_t format_version, int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io,
std::optional<int64_t> sequence_number, std::optional<int64_t> first_row_id) {
std::unique_ptr<ManifestFileAdapter> adapter;

switch (format_version) {
case 1:
return MakeV1Writer(snapshot_id, parent_snapshot_id, manifest_list_location,
std::move(file_io));
case 2:
case kFormatVersion1: {
adapter = std::make_unique<ManifestFileAdapterV1>(snapshot_id, parent_snapshot_id);
break;
}
case kFormatVersion2: {
ICEBERG_PRECHECK(sequence_number.has_value(),
"Sequence number is required for format version 2");
return MakeV2Writer(snapshot_id, parent_snapshot_id, sequence_number.value(),
manifest_list_location, std::move(file_io));
case 3:
adapter = std::make_unique<ManifestFileAdapterV2>(snapshot_id, parent_snapshot_id,
sequence_number.value());
break;
}
case kFormatVersion3: {
ICEBERG_PRECHECK(sequence_number.has_value(),
"Sequence number is required for format version 3");
ICEBERG_PRECHECK(first_row_id.has_value(),
"First row ID is required for format version 3");
return MakeV3Writer(snapshot_id, parent_snapshot_id, sequence_number.value(),
first_row_id.value(), manifest_list_location,
std::move(file_io));
adapter = std::make_unique<ManifestFileAdapterV3>(
snapshot_id, parent_snapshot_id, sequence_number.value(), first_row_id.value());
break;
}
default:
return NotSupported("Format version {} is not supported", format_version);
}

ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));
return std::unique_ptr<ManifestListWriter>(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

} // namespace iceberg
Loading
Loading