Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions include/paimon/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class PAIMON_EXPORT InputStream : public Stream {
/// @return Result containing the actual number of bytes read on success, or an error status on
/// failure.
/// @note The stream position advances by the number of bytes actually read.
virtual Result<int32_t> Read(char* buffer, uint32_t size) = 0;
virtual Result<int64_t> Read(char* buffer, int64_t size) = 0;

/// Read data from given position in the stream.
///
Expand All @@ -83,7 +83,7 @@ class PAIMON_EXPORT InputStream : public Stream {
/// @param[out] buffer The buffer to store the read content.
/// @param size The number of bytes to read.
/// @param offset The position in the stream to read from.
virtual Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) = 0;
virtual Result<int64_t> Read(char* buffer, int64_t size, int64_t offset) = 0;

/// Asynchronously read data from the input stream.
///
Expand All @@ -98,7 +98,7 @@ class PAIMON_EXPORT InputStream : public Stream {
/// @param callback The callback function to be invoked upon completion of the read operation.
/// The callback will receive a Status object indicating the success or failure
/// of the read operation.
virtual void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
virtual void ReadAsync(char* buffer, int64_t size, int64_t offset,
std::function<void(Status)>&& callback) = 0;

/// Get an identifier that uniquely identify the underlying content.
Expand All @@ -107,7 +107,7 @@ class PAIMON_EXPORT InputStream : public Stream {
virtual Result<std::string> GetUri() const = 0;

/// Get the total length of the file in bytes.
virtual Result<uint64_t> Length() const = 0;
virtual Result<int64_t> Length() const = 0;
};

/// Abstract class for output stream operations.
Expand All @@ -121,7 +121,7 @@ class PAIMON_EXPORT OutputStream : public Stream {
/// @return Result containing the actual number of bytes written on success, or an error status
/// on failure.
/// @note The stream position advances by the number of bytes actually written.
virtual Result<int32_t> Write(const char* buffer, uint32_t size) = 0;
virtual Result<int64_t> Write(const char* buffer, int64_t size) = 0;

/// Flush pending data to the disk.
virtual Status Flush() = 0;
Expand Down Expand Up @@ -160,7 +160,7 @@ class PAIMON_EXPORT FileStatus {

/// Get the size of the file in bytes.
/// @note For directories, this method is undefined behavior.
virtual uint64_t GetLen() const = 0;
virtual int64_t GetLen() const = 0;

/// Check if this entry represents a directory.
virtual bool IsDir() const = 0;
Expand Down
22 changes: 11 additions & 11 deletions include/paimon/io/buffered_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PAIMON_EXPORT BufferedInputStream : public InputStream {
/// @param in The underlying input stream to wrap.
/// @param buffer_size Size of the internal buffer in bytes.
/// @param pool Memory pool for buffer allocation.
BufferedInputStream(const std::shared_ptr<InputStream>& in, int32_t buffer_size,
BufferedInputStream(const std::shared_ptr<InputStream>& in, int64_t buffer_size,
MemoryPool* pool);

~BufferedInputStream() noexcept override;
Expand All @@ -51,36 +51,36 @@ class PAIMON_EXPORT BufferedInputStream : public InputStream {

Result<int64_t> GetPos() const override;

Result<int32_t> Read(char* buffer, uint32_t size) override;
Result<int64_t> Read(char* buffer, int64_t size) override;

Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) override;
Result<int64_t> Read(char* buffer, int64_t size, int64_t offset) override;

void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
void ReadAsync(char* buffer, int64_t size, int64_t offset,
std::function<void(Status)>&& callback) override;

Result<uint64_t> Length() const override;
Result<int64_t> Length() const override;

Status Close() override;

Result<std::string> GetUri() const override;

static constexpr int32_t DEFAULT_BUFFER_SIZE = 8192;
static constexpr int64_t DEFAULT_BUFFER_SIZE = 8192;

private:
/// Fill the internal buffer from the underlying stream.
Status Fill();

/// Internal read implementation.
/// @pre size > 0
Result<int32_t> InnerRead(char* buffer, int32_t size);
Result<int64_t> InnerRead(char* buffer, int64_t size);

/// Validate that the expected number of bytes were read.
Status AssertReadLength(int32_t read_length, int32_t actual_read_length) const;
Status AssertReadLength(int64_t read_length, int64_t actual_read_length) const;

private:
int32_t buffer_size_;
int32_t pos_ = 0;
int32_t count_ = 0;
int64_t buffer_size_;
int64_t pos_ = 0;
int64_t count_ = 0;
std::unique_ptr<Bytes> buffer_;
std::shared_ptr<InputStream> in_;
};
Expand Down
12 changes: 6 additions & 6 deletions include/paimon/io/byte_array_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace paimon {
/// Input stream for memory buffer, inherits from `InputStream`.
class PAIMON_EXPORT ByteArrayInputStream : public InputStream {
public:
ByteArrayInputStream(const char* buffer, uint64_t length);
ByteArrayInputStream(const char* buffer, int64_t length);
~ByteArrayInputStream() override = default;

/// @return The raw data pointer of current pos.
Expand All @@ -41,14 +41,14 @@ class PAIMON_EXPORT ByteArrayInputStream : public InputStream {
return position_;
}

Result<int32_t> Read(char* buffer, uint32_t size) override;
Result<int64_t> Read(char* buffer, int64_t size) override;

Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) override;
Result<int64_t> Read(char* buffer, int64_t size, int64_t offset) override;

void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
void ReadAsync(char* buffer, int64_t size, int64_t offset,
std::function<void(Status)>&& callback) override;

Result<uint64_t> Length() const override {
Result<int64_t> Length() const override {
return length_;
}

Expand All @@ -58,7 +58,7 @@ class PAIMON_EXPORT ByteArrayInputStream : public InputStream {

private:
const char* buffer_;
const uint64_t length_;
const int64_t length_;
int64_t position_;
};
} // namespace paimon
8 changes: 4 additions & 4 deletions include/paimon/io/data_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class PAIMON_EXPORT DataInputStream {
/// Read raw data of specified size from the stream.
/// @param data Buffer to store the read data.
/// @param size Number of bytes to read.
Status Read(char* data, uint32_t size) const;
Status Read(char* data, int64_t size) const;

/// Read string from the stream.
/// @note First read length (int16), then read string bytes.
Expand All @@ -65,7 +65,7 @@ class PAIMON_EXPORT DataInputStream {
Result<int64_t> GetPos() const;

/// Get the total length of the underlying input stream.
Result<uint64_t> Length() const;
Result<int64_t> Length() const;

/// Set the byte order for endianness conversion.
/// @param order The byte order to use `PAIMON_BIG_ENDIAN` or `PAIMON_LITTLE_ENDIAN`.
Expand All @@ -77,11 +77,11 @@ class PAIMON_EXPORT DataInputStream {
/// Validate that the expected number of bytes were read.
/// @param read_length Expected number of bytes to read.
/// @param actual_read_length Actual number of bytes read.
Status AssertReadLength(int32_t read_length, int32_t actual_read_length) const;
Status AssertReadLength(int64_t read_length, int64_t actual_read_length) const;

/// Check if there are enough bytes available to read.
/// @param need_length Number of bytes needed.
Status AssertBoundary(int32_t need_length) const;
Status AssertBoundary(int64_t need_length) const;

/// Determine if byte swapping is needed based on current byte order and system endianness.
/// @return `true` if byte swapping is required, `false` otherwise.
Expand Down
2 changes: 1 addition & 1 deletion include/paimon/memory/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class PAIMON_EXPORT Bytes {
/// @param length Number of bytes to allocate.
/// @param pool Memory pool to use for allocation.
/// @return Unique pointer to the newly allocated Bytes object.
static PAIMON_UNIQUE_PTR<Bytes> AllocateBytes(int32_t length, MemoryPool* pool);
static PAIMON_UNIQUE_PTR<Bytes> AllocateBytes(size_t length, MemoryPool* pool);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why size_t


/// Allocate a new Bytes object from string data.
///
Expand Down
16 changes: 14 additions & 2 deletions src/paimon/common/data/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,20 @@ Result<std::unique_ptr<InputStream>> Blob::NewInputStream(
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> file,
fs->Open(impl_->GetDescriptor()->Uri()));

return OffsetInputStream::Create(std::move(file), impl_->GetDescriptor()->Length(),
impl_->GetDescriptor()->Offset());
int64_t blob_length = impl_->GetDescriptor()->Length();
int64_t blob_offset = impl_->GetDescriptor()->Offset();

PAIMON_ASSIGN_OR_RAISE(int64_t total_length, file->Length());
if (PAIMON_UNLIKELY(blob_offset > total_length)) {
return Status::Invalid(
fmt::format("offset {} exceed total length {}", blob_offset, total_length));
}
Comment on lines +100 to +104
if (blob_length == -1) {
// blob_length == -1 means it's dynamic length, should read to the end
blob_length = total_length - blob_offset;
}

return OffsetInputStream::Create(std::move(file), blob_length, blob_offset, total_length);
}

Result<PAIMON_UNIQUE_PTR<Bytes>> Blob::ToData(const std::shared_ptr<FileSystem>& fs,
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/data/blob_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ TEST_F(BlobTest, TestNewInputStreamWithOffsetAndLength) {
ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(file_system_));
ASSERT_TRUE(input_stream);

ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
ASSERT_OK_AND_ASSIGN(int64_t length, input_stream->Length());
ASSERT_EQ(6, length);

// Test reading with offset and length applied
Expand All @@ -133,7 +133,7 @@ TEST_F(BlobTest, TestNewInputStreamWithDynamicLength) {
ASSERT_OK_AND_ASSIGN(auto input_stream, blob->NewInputStream(file_system_));
ASSERT_TRUE(input_stream);

ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
ASSERT_OK_AND_ASSIGN(int64_t length, input_stream->Length());
ASSERT_EQ(12, length);

// Test reading from offset to end (should read "cdefghijklmn")
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/data/decimal_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ TEST(DecimalTest, TestCompatibleWithJava) {
auto pool = GetDefaultPool();
auto file_system = std::make_unique<LocalFileSystem>();
auto file_name = paimon::test::GetDataDir() + "/decimal_bytes.data";
uint64_t file_length = file_system->GetFileStatus(file_name).value()->GetLen();
int64_t file_length = file_system->GetFileStatus(file_name).value()->GetLen();
ASSERT_GT(file_length, 0);
ASSERT_OK_AND_ASSIGN(auto input_stream, file_system->Open(file_name));
auto data_bytes = Bytes::AllocateBytes(file_length, pool.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ TEST_F(BitmapIndexTest, TestHighCardinalityForCompatibility) {
auto file_system = std::make_unique<LocalFileSystem>();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream,
file_system->Open(index_file_name));
ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
ASSERT_OK_AND_ASSIGN(int64_t length, input_stream->Length());

BitmapFileIndex file_index({});
ASSERT_OK_AND_ASSIGN(auto reader, file_index.CreateReader(CreateArrowSchema(type).get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Result<std::shared_ptr<FileIndexReader>> BloomFilterFileIndex::CreateReader(

PAIMON_RETURN_NOT_OK(input_stream->Seek(start, SeekOrigin::FS_SEEK_SET));
auto bytes = std::make_shared<Bytes>(length, pool.get());
PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len,
PAIMON_ASSIGN_OR_RAISE(int64_t actual_read_len,
input_stream->Read(bytes->data(), bytes->size()));
if (static_cast<size_t>(actual_read_len) != bytes->size()) {
return Status::Invalid(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Result<std::shared_ptr<FileIndexReader>> BitSliceIndexBitmapFileIndex::CreateRea

PAIMON_RETURN_NOT_OK(input_stream->Seek(start, SeekOrigin::FS_SEEK_SET));
auto bytes = std::make_unique<Bytes>(length, pool.get());
PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len,
PAIMON_ASSIGN_OR_RAISE(int64_t actual_read_len,
input_stream->Read(bytes->data(), bytes->size()));
if (static_cast<size_t>(actual_read_len) != bytes->size()) {
return Status::Invalid(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <algorithm>
#include <cmath>
#include <limits>
#include <string>

#include "paimon/common/io/memory_segment_output_stream.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ TEST_F(RangeBitmapIoTest, TestSimple) {
ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
fs_->Create(file_path, /*overwrite=*/false));
ASSERT_OK_AND_ASSIGN(
int32_t write_len,
int64_t write_len,
out->Write(reinterpret_cast<char*>(serialized_bytes->data()), serialized_bytes->size()));
ASSERT_EQ(write_len, serialized_bytes->size());
ASSERT_OK(out->Flush());
Expand Down
12 changes: 6 additions & 6 deletions src/paimon/common/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ Status FileSystem::ReadFile(const std::string& path, std::string* content) {
Status s = in->Close();
(void)s;
});
PAIMON_ASSIGN_OR_RAISE(uint64_t length, in->Length());
content->resize(length);
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, in->Read(content->data(), length));
if (read_length != static_cast<int32_t>(length)) {
PAIMON_ASSIGN_OR_RAISE(int64_t length, in->Length());
content->resize(static_cast<size_t>(length));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check length not negative?

PAIMON_ASSIGN_OR_RAISE(int64_t read_length, in->Read(content->data(), length));
if (read_length != length) {
Comment on lines +51 to +54
return Status::IOError(fmt::format("path {}, expect read len {}, actual read len {}",
path, length, read_length));
}
Expand All @@ -69,8 +69,8 @@ Status FileSystem::WriteFile(const std::string& path, const std::string& content
Status s = out->Close();
(void)s;
});
int32_t length = content.size();
PAIMON_ASSIGN_OR_RAISE(int32_t write_length, out->Write(content.data(), length));
auto length = static_cast<int64_t>(content.size());
PAIMON_ASSIGN_OR_RAISE(int64_t write_length, out->Write(content.data(), length));
if (write_length != length) {
return Status::IOError(fmt::format("path {}, expect write len {}, actual write len {}",
path, length, write_length));
Expand Down
Loading
Loading