Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <nlohmann/json.hpp>

#include "iceberg/constants.h"
#include "iceberg/json_serde_internal.h"
#include "iceberg/name_mapping.h"
#include "iceberg/partition_field.h"
Expand Down
222 changes: 59 additions & 163 deletions src/iceberg/manifest/manifest_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "iceberg/partition_summary_internal.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -272,87 +271,41 @@ Result<std::unique_ptr<Writer>> OpenFileWriter(
return writer;
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
}
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
int8_t format_version, std::optional<int64_t> snapshot_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
ManifestContent content, std::optional<int64_t> first_row_id) {
ICEBERG_PRECHECK(!manifest_location.empty(), "Manifest location cannot be empty");
ICEBERG_PRECHECK(file_io, "FileIO cannot be null");
ICEBERG_PRECHECK(partition_spec, "PartitionSpec cannot be null");
ICEBERG_PRECHECK(current_schema, "Current schema cannot be null");

auto adapter = std::make_unique<ManifestEntryAdapterV1>(
snapshot_id, std::move(partition_spec), std::move(current_schema));
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
std::unique_ptr<ManifestEntryAdapter> adapter;
std::optional<int64_t> writer_first_row_id = std::nullopt;

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::unique_ptr<ManifestWriter>(new ManifestWriter(
std::move(writer), std::move(adapter), manifest_location, std::nullopt));
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
switch (format_version) {
case 1: {
adapter = std::make_unique<ManifestEntryAdapterV1>(
snapshot_id, std::move(partition_spec), std::move(current_schema));
break;
}
case 2: {
adapter = std::make_unique<ManifestEntryAdapterV2>(
snapshot_id, std::move(partition_spec), std::move(current_schema), content);
break;
}
case 3: {
adapter = std::make_unique<ManifestEntryAdapterV3>(
snapshot_id, first_row_id, std::move(partition_spec), std::move(current_schema),
content);
writer_first_row_id = first_row_id;
break;
}
default:
return NotSupported("Format version {} is not supported", format_version);
}
auto adapter = std::make_unique<ManifestEntryAdapterV2>(
snapshot_id, std::move(partition_spec), std::move(current_schema), content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::unique_ptr<ManifestWriter>(new ManifestWriter(
std::move(writer), std::move(adapter), manifest_location, std::nullopt));
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
ManifestContent content) {
if (manifest_location.empty()) {
return InvalidArgument("Manifest location cannot be empty");
}
if (!file_io) {
return InvalidArgument("FileIO cannot be null");
}
if (!partition_spec) {
return InvalidArgument("PartitionSpec cannot be null");
}
if (!current_schema) {
return InvalidArgument("Current schema cannot be null");
}
auto adapter = std::make_unique<ManifestEntryAdapterV3>(
snapshot_id, first_row_id, std::move(partition_spec), std::move(current_schema),
content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

Expand All @@ -362,28 +315,7 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_entry"));
return std::unique_ptr<ManifestWriter>(new ManifestWriter(
std::move(writer), std::move(adapter), manifest_location, first_row_id));
}

Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
int8_t format_version, std::optional<int64_t> snapshot_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
ManifestContent content, std::optional<int64_t> first_row_id) {
switch (format_version) {
case 1:
return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec), std::move(current_schema));
case 2:
return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io),
std::move(partition_spec), std::move(current_schema), content);
case 3:
return MakeV3Writer(snapshot_id, first_row_id, manifest_location,
std::move(file_io), std::move(partition_spec),
std::move(current_schema), content);
default:
return NotSupported("Format version {} is not supported", format_version);
}
std::move(writer), std::move(adapter), manifest_location, writer_first_row_id));
}

ManifestListWriter::ManifestListWriter(std::unique_ptr<Writer> writer,
Expand Down Expand Up @@ -420,83 +352,47 @@ std::optional<int64_t> ManifestListWriter::next_row_id() const {
return adapter_->next_row_id();
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
auto adapter = std::make_unique<ManifestFileAdapterV1>(snapshot_id, parent_snapshot_id);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));
return std::unique_ptr<ManifestListWriter>(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, std::string_view manifest_list_location,
std::shared_ptr<FileIO> file_io) {
auto adapter = std::make_unique<ManifestFileAdapterV2>(snapshot_id, parent_snapshot_id,
sequence_number);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));

return std::unique_ptr<ManifestListWriter>(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, int64_t first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
auto adapter = std::make_unique<ManifestFileAdapterV3>(snapshot_id, parent_snapshot_id,
sequence_number, first_row_id);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));
return std::unique_ptr<ManifestListWriter>(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeWriter(
int8_t format_version, int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io,
std::optional<int64_t> sequence_number, std::optional<int64_t> first_row_id) {
std::unique_ptr<ManifestFileAdapter> adapter;

switch (format_version) {
case 1:
return MakeV1Writer(snapshot_id, parent_snapshot_id, manifest_list_location,
std::move(file_io));
case 2:
case 1: {
adapter = std::make_unique<ManifestFileAdapterV1>(snapshot_id, parent_snapshot_id);
break;
}
case 2: {
ICEBERG_PRECHECK(sequence_number.has_value(),
"Sequence number is required for format version 2");
return MakeV2Writer(snapshot_id, parent_snapshot_id, sequence_number.value(),
manifest_list_location, std::move(file_io));
case 3:
adapter = std::make_unique<ManifestFileAdapterV2>(snapshot_id, parent_snapshot_id,
sequence_number.value());
break;
}
case 3: {
ICEBERG_PRECHECK(sequence_number.has_value(),
"Sequence number is required for format version 3");
ICEBERG_PRECHECK(first_row_id.has_value(),
"First row ID is required for format version 3");
return MakeV3Writer(snapshot_id, parent_snapshot_id, sequence_number.value(),
first_row_id.value(), manifest_list_location,
std::move(file_io));
adapter = std::make_unique<ManifestFileAdapterV3>(
snapshot_id, parent_snapshot_id, sequence_number.value(), first_row_id.value());
break;
}
default:
return NotSupported("Format version {} is not supported", format_version);
}

ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());

auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
adapter->metadata(), "manifest_file"));
return std::unique_ptr<ManifestListWriter>(
new ManifestListWriter(std::move(writer), std::move(adapter)));
}

} // namespace iceberg
77 changes: 0 additions & 77 deletions src/iceberg/manifest/manifest_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,48 +117,6 @@ class ICEBERG_EXPORT ManifestWriter {
/// \note Only valid after the file is closed.
Result<ManifestFile> ToManifestFile() const;

/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param current_schema Current table schema.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV1Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema);

/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param current_schema Schema containing the source fields referenced by partition
/// spec.
/// \param content Content of the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content);

/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param first_row_id First row ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
/// \param current_schema Schema containing the source fields referenced by partition
/// spec.
/// \param content Content of the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<PartitionSpec> partition_spec,
std::shared_ptr<Schema> current_schema, ManifestContent content);

/// \brief Factory function to create a writer for a manifest file based on format
/// version.
/// \param format_version The format version (1, 2, 3, etc.).
Expand Down Expand Up @@ -226,41 +184,6 @@ class ICEBERG_EXPORT ManifestListWriter {
/// \brief Get the next row id to assign.
std::optional<int64_t> next_row_id() const;

/// \brief Creates a writer for the v1 manifest list.
/// \param snapshot_id ID of the snapshot.
/// \param parent_snapshot_id ID of the parent snapshot.
/// \param manifest_list_location Path to the manifest list file.
/// \param file_io File IO implementation to use.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestListWriter>> MakeV1Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);

/// \brief Creates a writer for the manifest list.
/// \param snapshot_id ID of the snapshot.
/// \param parent_snapshot_id ID of the parent snapshot.
/// \param sequence_number Sequence number of the snapshot.
/// \param manifest_list_location Path to the manifest list file.
/// \param file_io File IO implementation to use.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestListWriter>> MakeV2Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, std::string_view manifest_list_location,
std::shared_ptr<FileIO> file_io);

/// \brief Creates a writer for the manifest list.
/// \param snapshot_id ID of the snapshot.
/// \param parent_snapshot_id ID of the parent snapshot.
/// \param sequence_number Sequence number of the snapshot.
/// \param first_row_id First row ID of the snapshot.
/// \param manifest_list_location Path to the manifest list file.
/// \param file_io File IO implementation to use.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestListWriter>> MakeV3Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, int64_t first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);

/// \brief Factory function to create a writer for the manifest list based on format
/// version.
/// \param format_version The format version (1, 2, 3, etc.).
Expand Down
Loading
Loading