Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 59 additions & 49 deletions tree/ntuple/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ The page source also gives access to the ntuple's metadata.
// clang-format on
class RPageSource : public RPageStorage {
protected:
/// Summarizes meta-data necessary to load a certain page. Used by LoadPageImpl().
/// Summarizes meta-data necessary to load a certain page. Used by LoadPageFromSummary().
struct RPageSummary {
ROOT::DescriptorId_t fClusterId = 0;
/// The first element number of the page's column in the given cluster
Expand Down Expand Up @@ -631,27 +631,78 @@ public:
};

private:
/// Keeps track of the requested physical column IDs and their in-memory target type via a column element identifier.
/// When using alias columns (projected fields), physical columns may be requested multiple times.
class RActivePhysicalColumns {
public:
struct RColumnInfo {
ROOT::Internal::RColumnElementBase::RIdentifier fElementId;
std::size_t fRefCounter = 0;
};

private:
/// Maps physical column IDs to all the requested in-memory representations.
/// A pair of physical column ID and in-memory representation can be requested multiple times, which is
/// indicated by the reference counter.
/// We can only have a handful of possible in-memory representations for a given column,
/// so it is fine to search them linearly.
std::unordered_map<ROOT::DescriptorId_t, std::vector<RColumnInfo>> fColumnInfos;

public:
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId);
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId);
ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const;
bool HasColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
{
return fColumnInfos.count(physicalColumnId) > 0;
}
const std::vector<RColumnInfo> &GetColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
{
return fColumnInfos.at(physicalColumnId);
}
};

ROOT::RNTupleDescriptor fDescriptor;
mutable std::shared_mutex fDescriptorLock;
REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range
bool fHasStructure = false; ///< Set to true once `LoadStructure()` is called
bool fIsAttached = false; ///< Set to true once `Attach()` is called
bool fHasStreamerInfosRegistered = false; ///< Set to true when RegisterStreamerInfos() is called.

/// The active columns are implicitly defined by the model fields or views
RActivePhysicalColumns fActivePhysicalColumns;

/// The cluster pool asynchronously preloads the next few clusters. Note that derived classes should call
/// StopClusterPoolBackgroundThread() in their destructor so that the I/O background thread does not call
/// methods from the destructed derived class.
ROOT::Internal::RClusterPool fClusterPool;
/// The last cluster from which a page got loaded. Points into fClusterPool->fPool
ROOT::Internal::RCluster *fCurrentCluster = nullptr;
/// Pages that are unzipped with IMT are staged into the page pool
ROOT::Internal::RPagePool fPagePool;

/// Remembers the last cluster id from which a page was requested
ROOT::DescriptorId_t fLastUsedCluster = ROOT::kInvalidDescriptorId;
/// Clusters from where pages got preloaded in UnzipClusterImpl(), ordered by first entry number
/// of the clusters. If the last used cluster changes in LoadPage(), all unused pages from
/// previous clusters are evicted from the page pool. Pinned clusters won't be evicted.
std::map<ROOT::NTupleSize_t, ROOT::DescriptorId_t> fPreloadedClusters;

/// Pinned clusters and their $2 * (cluster bunch size) - 1$ successors will not be evicted from the cluster pool.
/// Pages of pinned clusters won't be evicted from the page pool.
std::unordered_set<ROOT::DescriptorId_t> fPinnedClusters;

/// Does nothing if fLastUsedCluster == clusterId. Otherwise, updated fLastUsedCluster
/// and evict unused paged from the page pool of all previous clusters.
/// Must not be called when the descriptor guard is taken.
void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId);

// Common treatment of zero pages that would otherwise need to be handled in LoadPageImpl()
// Common treatment of zero pages in LoadPageFromSummary()
ROOT::Internal::RPageRef LoadZeroPage(ColumnHandle_t columnHandle, const RPageSummary &pageSummary);
// Once the page is found to be missing in the page cache and all information about the page is collected,
// either using a global or a local element index, perform the common page loading tasks using the page summary.
// This includes zero page handling and prefetching the corresponding cluster.
ROOT::Internal::RPageRef LoadPageFromSummary(ColumnHandle_t columnHandle, const RPageSummary &pageSummary);

protected:
/// Default I/O performance counters that get registered in `fMetrics`
Expand All @@ -675,52 +726,9 @@ protected:
ROOT::Experimental::Detail::RNTupleCalcPerf &fCompressionRatio;
};

/// Keeps track of the requested physical column IDs and their in-memory target type via a column element identifier.
/// When using alias columns (projected fields), physical columns may be requested multiple times.
class RActivePhysicalColumns {
public:
struct RColumnInfo {
ROOT::Internal::RColumnElementBase::RIdentifier fElementId;
std::size_t fRefCounter = 0;
};

private:
/// Maps physical column IDs to all the requested in-memory representations.
/// A pair of physical column ID and in-memory representation can be requested multiple times, which is
/// indicated by the reference counter.
/// We can only have a handful of possible in-memory representations for a given column,
/// so it is fine to search them linearly.
std::unordered_map<ROOT::DescriptorId_t, std::vector<RColumnInfo>> fColumnInfos;

public:
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId);
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId);
ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const;
bool HasColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
{
return fColumnInfos.count(physicalColumnId) > 0;
}
const std::vector<RColumnInfo> &GetColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
{
return fColumnInfos.at(physicalColumnId);
}
};

std::unique_ptr<RCounters> fCounters;

ROOT::RNTupleReadOptions fOptions;
/// The active columns are implicitly defined by the model fields or views
RActivePhysicalColumns fActivePhysicalColumns;
/// Pinned clusters and their $2 * (cluster bunch size) - 1$ successors will not be evicted from the cluster pool.
/// Pages of pinned clusters won't be evicted from the page pool.
std::unordered_set<ROOT::DescriptorId_t> fPinnedClusters;

/// The cluster pool asynchronously preloads the next few clusters. Note that derived classes should call
/// fClusterPool.StopBackgroundThread() in their destructor so that the I/O background thread does not call
/// methods from the destructed derived class.
ROOT::Internal::RClusterPool fClusterPool;
/// Pages that are unzipped with IMT are staged into the page pool
ROOT::Internal::RPagePool fPagePool;

virtual void LoadStructureImpl() = 0;
/// `LoadStructureImpl()` has been called before `AttachImpl()` is called
Expand All @@ -729,8 +737,6 @@ protected:
virtual std::unique_ptr<RPageSource> CloneImpl() const = 0;
// Only called if a task scheduler is set. No-op be default.
virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster);
// Returns a page from storage if not found in the page pool. Will never receive requests for zero pages.
virtual ROOT::Internal::RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RPageSummary &pageSummary) = 0;
// Returns a sealed page from storage without adding it to the page pool. The sealed pages buffer and buffer size
// is already initialized.
virtual void LoadSealedPageImpl(const RNTupleLocator &locator, RSealedPage &sealedPage) = 0;
Expand All @@ -754,6 +760,9 @@ protected:
/// Note that the underlying lock is not recursive. See `GetSharedDescriptorGuard()` for further information.
RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); }

// To be called in the destructor of derived classes
void StopClusterPoolBackgroundThread() { fClusterPool.StopBackgroundThread(); }

public:
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions);
RPageSource(const RPageSource &) = delete;
Expand Down Expand Up @@ -809,8 +818,9 @@ public:
void SetEntryRange(const REntryRange &range);
REntryRange GetEntryRange() const { return fEntryRange; }

/// Allocates and fills a page that contains the index-th element. The default implementation searches
/// the page and calls LoadPageImpl(). Returns a default-constructed RPage for suppressed columns.
/// Allocates and fills a page that contains the index-th element. Calls into the concrete page source
/// for loading the corresponding sealed page of cluster where necessary.
/// Returns a default-constructed RPage for suppressed columns.
virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex);
/// Another version of `LoadPage` that allows to specify cluster-relative indexes.
/// Returns a default-constructed RPage for suppressed columns.
Expand Down
1 change: 0 additions & 1 deletion tree/ntuple/inc/ROOT/RPageStorageDaos.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
ntuple_index_t fNTupleIndex{0};

/// The last cluster from which a page got loaded. Points into fClusterPool->fPool
ROOT::Internal::RCluster *fCurrentCluster = nullptr;

Check warning on line 154 in tree/ntuple/inc/ROOT/RPageStorageDaos.hxx

View workflow job for this annotation

GitHub Actions / mac-beta ARM64

private field 'fCurrentCluster' is not used [-Wunused-private-field]

Check warning on line 154 in tree/ntuple/inc/ROOT/RPageStorageDaos.hxx

View workflow job for this annotation

GitHub Actions / mac14 X64 CMAKE_CXX_STANDARD=20

private field 'fCurrentCluster' is not used [-Wunused-private-field]

Check warning on line 154 in tree/ntuple/inc/ROOT/RPageStorageDaos.hxx

View workflow job for this annotation

GitHub Actions / mac26 ARM64

private field 'fCurrentCluster' is not used [-Wunused-private-field]

Check warning on line 154 in tree/ntuple/inc/ROOT/RPageStorageDaos.hxx

View workflow job for this annotation

GitHub Actions / alma10 clang Ninja builtins auto-registration off CMAKE_CXX_STANDARD=20

private field 'fCurrentCluster' is not used [-Wunused-private-field]
/// A container that stores object data (header/footer, pages, etc.)
std::unique_ptr<RDaosContainer> fDaosContainer;
/// A URI to a DAOS pool of the form 'daos://pool-label/container-label'
Expand All @@ -159,7 +159,6 @@

ROOT::Internal::RNTupleDescriptorBuilder fDescriptorBuilder;

ROOT::Internal::RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RPageSummary &pageSummary) final;
void LoadSealedPageImpl(const RNTupleLocator &locator, RSealedPage &sealedPage) final;

protected:
Expand Down
1 change: 0 additions & 1 deletion tree/ntuple/inc/ROOT/RPageStorageFile.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ protected:
/// The cloned page source creates a new raw file and reader and opens its own file descriptor to the data.
std::unique_ptr<RPageSource> CloneImpl() const final;

RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RPageSummary &pageSummary) final;
void LoadSealedPageImpl(const RNTupleLocator &locator, RSealedPage &sealedPage) final;

public:
Expand Down
88 changes: 69 additions & 19 deletions tree/ntuple/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ bool ROOT::Internal::RPageSource::REntryRange::IntersectsWith(const ROOT::RClust

ROOT::Internal::RPageSource::RPageSource(std::string_view name, const ROOT::RNTupleReadOptions &options)
: RPageStorage(name),
fOptions(options),
fClusterPool(*this, ROOT::Internal::RNTupleReadOptionsManip::GetClusterBunchSize(fOptions)),
fPagePool(*this)
fClusterPool(*this, ROOT::Internal::RNTupleReadOptionsManip::GetClusterBunchSize(options)),
fPagePool(*this),
fOptions(options)
{
}

Expand Down Expand Up @@ -440,6 +440,70 @@ ROOT::Internal::RPageSource::LoadZeroPage(ColumnHandle_t columnHandle, const RPa
return fPagePool.RegisterPage(std::move(pageZero), RPagePool::RKey{columnHandle.fPhysicalId, elementInMemoryType});
}

ROOT::Internal::RPageRef
ROOT::Internal::RPageSource::LoadPageFromSummary(ColumnHandle_t columnHandle, const RPageSummary &pageSummary)
{
if (pageSummary.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown) {
throw RException(R__FAIL("tried to read a page with an unknown locator"));
} else if (pageSummary.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
return LoadZeroPage(columnHandle, pageSummary);
}

const auto &columnId = columnHandle.fPhysicalId;
const auto &clusterId = pageSummary.fClusterId;
const auto &pageInfo = pageSummary.fPageInfo;

const auto element = columnHandle.fColumn->GetElement();
const auto elementSize = element->GetSize();
const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;

UpdateLastUsedCluster(clusterId);

RSealedPage sealedPage;
sealedPage.SetNElements(pageInfo.GetNElements());
sealedPage.SetHasChecksum(pageInfo.HasChecksum());
sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off

if (fOptions.GetClusterCache() == ROOT::RNTupleReadOptions::EClusterCache::kOff) {
directReadBuffer = MakeUninitArray<unsigned char>(sealedPage.GetBufferSize());
sealedPage.SetBuffer(directReadBuffer.get());
LoadSealedPageImpl(pageInfo.GetLocator(), sealedPage);

fCounters->fNPageRead.Inc();
fCounters->fNRead.Inc();
fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
} else {
if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
fCurrentCluster = fClusterPool.GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
R__ASSERT(fCurrentCluster->ContainsColumn(columnId));

// The cluster pool may have unzipped the required page into the page pool
auto cachedPageRef = fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, elementInMemoryType},
RNTupleLocalIndex(clusterId, pageInfo.GetFirstElementIndex()));
if (!cachedPageRef.Get().IsNull())
return cachedPageRef;

ROnDiskPage::Key key(columnId, pageInfo.GetPageNumber());
auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
sealedPage.SetBuffer(onDiskPage->GetAddress());
}

ROOT::Internal::RPage newPage;
{
RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
newPage = UnsealPage(sealedPage, *element).Unwrap();
fCounters->fSzUnzip.Add(elementSize * pageInfo.GetNElements());
}

newPage.SetWindow(pageSummary.fColumnOffset + pageInfo.GetFirstElementIndex(),
ROOT::Internal::RPage::RClusterInfo(clusterId, pageSummary.fColumnOffset));
fCounters->fNPageUnsealed.Inc();

return fPagePool.RegisterPage(std::move(newPage), RPagePool::RKey{columnId, elementInMemoryType});
}

ROOT::Internal::RPageRef
ROOT::Internal::RPageSource::LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex)
{
Expand Down Expand Up @@ -470,14 +534,7 @@ ROOT::Internal::RPageSource::LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleS
pageSummary.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(globalIndex - pageSummary.fColumnOffset);
}

if (pageSummary.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown) {
throw RException(R__FAIL("tried to read a page with an unknown locator"));
} else if (pageSummary.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
return LoadZeroPage(columnHandle, pageSummary);
}

UpdateLastUsedCluster(pageSummary.fClusterId);
return LoadPageImpl(columnHandle, pageSummary);
return LoadPageFromSummary(columnHandle, pageSummary);
}

ROOT::Internal::RPageRef
Expand Down Expand Up @@ -509,14 +566,7 @@ ROOT::Internal::RPageSource::LoadPage(ColumnHandle_t columnHandle, RNTupleLocalI
pageSummary.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(localIndex.GetIndexInCluster());
}

if (pageSummary.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown) {
throw RException(R__FAIL("tried to read a page with an unknown locator"));
} else if (pageSummary.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
return LoadZeroPage(columnHandle, pageSummary);
}

UpdateLastUsedCluster(clusterId);
return LoadPageImpl(columnHandle, pageSummary);
return LoadPageFromSummary(columnHandle, pageSummary);
}

void ROOT::Internal::RPageSource::EnableDefaultMetrics(const std::string &prefix)
Expand Down
59 changes: 1 addition & 58 deletions tree/ntuple/src/RPageStorageDaos.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ ROOT::Experimental::Internal::RPageSourceDaos::RPageSourceDaos(std::string_view

ROOT::Experimental::Internal::RPageSourceDaos::~RPageSourceDaos()
{
fClusterPool.StopBackgroundThread();
StopClusterPoolBackgroundThread();
}

ROOT::RNTupleDescriptor
Expand Down Expand Up @@ -504,63 +504,6 @@ void ROOT::Experimental::Internal::RPageSourceDaos::LoadSealedPageImpl(const RNT
daosKey.fDkey, daosKey.fAkey);
}

ROOT::Internal::RPageRef ROOT::Experimental::Internal::RPageSourceDaos::LoadPageImpl(ColumnHandle_t columnHandle,
const RPageSummary &pageSummary)
{
const auto &columnId = columnHandle.fPhysicalId;
const auto &clusterId = pageSummary.fClusterId;
const auto &pageInfo = pageSummary.fPageInfo;

const auto element = columnHandle.fColumn->GetElement();
const auto elementSize = element->GetSize();
const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;

RSealedPage sealedPage;
sealedPage.SetNElements(pageInfo.GetNElements());
sealedPage.SetHasChecksum(pageInfo.HasChecksum());
sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off

if (fOptions.GetClusterCache() == ROOT::RNTupleReadOptions::EClusterCache::kOff) {
directReadBuffer = MakeUninitArray<unsigned char>(sealedPage.GetBufferSize());
RDaosKey daosKey =
GetPageDaosKey(fNTupleIndex, pageInfo.GetLocator().GetPosition<RNTupleLocatorObject64>().GetLocation());
fDaosContainer->ReadSingleAkey(directReadBuffer.get(), sealedPage.GetBufferSize(), daosKey.fOid, daosKey.fDkey,
daosKey.fAkey);
fCounters->fNPageRead.Inc();
fCounters->fNRead.Inc();
fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
sealedPage.SetBuffer(directReadBuffer.get());
} else {
if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
fCurrentCluster = fClusterPool.GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
R__ASSERT(fCurrentCluster->ContainsColumn(columnId));

// The cluster pool may have unzipped the required page into the page pool
auto cachedPageRef = fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, elementInMemoryType},
RNTupleLocalIndex(clusterId, pageInfo.GetFirstElementIndex()));
if (!cachedPageRef.Get().IsNull())
return cachedPageRef;

ROOT::Internal::ROnDiskPage::Key key(columnId, pageInfo.GetPageNumber());
auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
sealedPage.SetBuffer(onDiskPage->GetAddress());
}

ROOT::Internal::RPage newPage;
{
Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
newPage = UnsealPage(sealedPage, *element).Unwrap();
fCounters->fSzUnzip.Add(elementSize * pageInfo.GetNElements());
}

newPage.SetWindow(pageSummary.fColumnOffset + pageInfo.GetFirstElementIndex(),
ROOT::Internal::RPage::RClusterInfo(clusterId, pageSummary.fColumnOffset));
fCounters->fNPageUnsealed.Inc();
return fPagePool.RegisterPage(std::move(newPage), ROOT::Internal::RPagePool::RKey{columnId, elementInMemoryType});
}

std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Experimental::Internal::RPageSourceDaos::CloneImpl() const
{
auto clone = new RPageSourceDaos(fNTupleName, fURI, fOptions);
Expand Down
Loading
Loading