Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add RDataSource for TTree #17895

Merged
merged 3 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bindings/pyroot/pythonizations/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,13 @@ if(roofit)
endif()

if (dataframe)
# std::string_view in CPyCppyy
ROOT_ADD_PYUNITTEST(pyroot_string_view string_view.py)
if(NOT MSVC OR win_broken_tests)
# Test wrapping Python callables for use in C++ using numba
ROOT_ADD_PYUNITTEST(pyroot_numbadeclare numbadeclare.py PYTHON_DEPS numba)
ROOT_ADD_PYUNITTEST(pyroot_rdf_filter_pyz rdf_filter_pyz.py PYTHON_DEPS numba)
ROOT_ADD_PYUNITTEST(pyroot_rdf_define_pyz rdf_define_pyz.py PYTHON_DEPS numba)
# std::string_view in CPyCppyy
ROOT_ADD_PYUNITTEST(pyroot_string_view string_view.py)
endif()
endif()

Expand Down
3 changes: 3 additions & 0 deletions tree/dataframe/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ ROOT_STANDARD_LIBRARY_PACKAGE(ROOTDataFrame
ROOT/RRootDS.hxx
ROOT/RSnapshotOptions.hxx
ROOT/RTrivialDS.hxx
ROOT/RTTreeDS.hxx
ROOT/RDF/ActionHelpers.hxx
ROOT/RDF/ColumnReaderUtils.hxx
ROOT/RDF/GraphNode.hxx
Expand Down Expand Up @@ -107,6 +108,7 @@ ROOT_STANDARD_LIBRARY_PACKAGE(ROOTDataFrame
src/RCutFlowReport.cxx
src/RDataFrame.cxx
src/RDatasetSpec.cxx
src/RDataSource.cxx
src/RDFActionHelpers.cxx
src/RDFColumnReaderUtils.cxx
src/RDFColumnRegister.cxx
Expand All @@ -129,6 +131,7 @@ ROOT_STANDARD_LIBRARY_PACKAGE(ROOTDataFrame
src/RSample.cxx
src/RTreeColumnReader.cxx
src/RResultPtr.cxx
src/RTTreeDS.cxx
src/RVariationBase.cxx
src/RVariationReader.cxx
src/RVariationsDescription.cxx
Expand Down
67 changes: 53 additions & 14 deletions tree/dataframe/inc/ROOT/RDF/ActionHelpers.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "ROOT/RNTupleDS.hxx"
#include "ROOT/RNTupleWriter.hxx" // for SnapshotRNTupleHelper
#endif
#include "ROOT/RTTreeDS.hxx"

#include <algorithm>
#include <functional>
Expand Down Expand Up @@ -1530,12 +1531,15 @@ class R__CLING_PTRCHECK(off) SnapshotTTreeHelper : public RActionImpl<SnapshotTT
std::vector<void *> fBranchAddresses; // Addresses of objects associated to output branches
RBranchSet fOutputBranches;
std::vector<bool> fIsDefine;
ROOT::Detail::RDF::RLoopManager *fOutputLoopManager;
ROOT::RDF::RDataSource *fInputDataSource;

public:
using ColumnTypes_t = TypeList<ColTypes...>;
SnapshotTTreeHelper(std::string_view filename, std::string_view dirname, std::string_view treename,
const ColumnNames_t &vbnames, const ColumnNames_t &bnames, const RSnapshotOptions &options,
std::vector<bool> &&isDefine)
std::vector<bool> &&isDefine, ROOT::Detail::RDF::RLoopManager *loopManager,
ROOT::RDF::RDataSource *inputDataSource)
: fFileName(filename),
fDirName(dirname),
fTreeName(treename),
Expand All @@ -1544,7 +1548,9 @@ public:
fOutputBranchNames(ReplaceDotWithUnderscore(bnames)),
fBranches(vbnames.size(), nullptr),
fBranchAddresses(vbnames.size(), nullptr),
fIsDefine(std::move(isDefine))
fIsDefine(std::move(isDefine)),
fOutputLoopManager(loopManager),
fInputDataSource(inputDataSource)
{
EnsureValidSnapshotTTreeOutput(fOptions, fTreeName, fFileName);
}
Expand All @@ -1571,6 +1577,8 @@ public:
{
if (r)
fInputTree = r->GetTree();
else if (auto treeDS = dynamic_cast<ROOT::Internal::RDF::RTTreeDS *>(fInputDataSource))
fInputTree = treeDS->GetTree();
fBranchAddressesNeedReset = true;
}

Expand Down Expand Up @@ -1650,6 +1658,10 @@ public:
// must destroy the TTree first, otherwise TFile will delete it too leading to a double delete
fOutputTree.reset();
fOutputFile->Close();

// Now connect the data source to the loop manager so it can be used for further processing
auto fullTreeName = fDirName.empty() ? fTreeName : fDirName + '/' + fTreeName;
fOutputLoopManager->SetDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(fullTreeName, fFileName));
}

std::string GetActionName() { return "Snapshot"; }
Expand All @@ -1673,8 +1685,15 @@ public:
SnapshotTTreeHelper MakeNew(void *newName, std::string_view /*variation*/ = "nominal")
{
const std::string finalName = *reinterpret_cast<const std::string *>(newName);
return SnapshotTTreeHelper{
finalName, fDirName, fTreeName, fInputBranchNames, fOutputBranchNames, fOptions, std::vector<bool>(fIsDefine)};
return SnapshotTTreeHelper{finalName,
fDirName,
fTreeName,
fInputBranchNames,
fOutputBranchNames,
fOptions,
std::vector<bool>(fIsDefine),
fOutputLoopManager,
fInputDataSource};
}
};

Expand All @@ -1699,12 +1718,16 @@ class R__CLING_PTRCHECK(off) SnapshotTTreeHelperMT : public RActionImpl<Snapshot
std::vector<std::vector<void *>> fBranchAddresses;
std::vector<RBranchSet> fOutputBranches;
std::vector<bool> fIsDefine;
ROOT::Detail::RDF::RLoopManager *fOutputLoopManager;
ROOT::RDF::RDataSource *fInputDataSource;

public:
using ColumnTypes_t = TypeList<ColTypes...>;

SnapshotTTreeHelperMT(const unsigned int nSlots, std::string_view filename, std::string_view dirname,
std::string_view treename, const ColumnNames_t &vbnames, const ColumnNames_t &bnames,
const RSnapshotOptions &options, std::vector<bool> &&isDefine)
const RSnapshotOptions &options, std::vector<bool> &&isDefine,
ROOT::Detail::RDF::RLoopManager *loopManager, ROOT::RDF::RDataSource *inputDataSource)
: fNSlots(nSlots),
fOutputFiles(fNSlots),
fOutputTrees(fNSlots),
Expand All @@ -1719,7 +1742,9 @@ public:
fBranches(fNSlots, std::vector<TBranch *>(vbnames.size(), nullptr)),
fBranchAddresses(fNSlots, std::vector<void *>(vbnames.size(), nullptr)),
fOutputBranches(fNSlots),
fIsDefine(std::move(isDefine))
fIsDefine(std::move(isDefine)),
fOutputLoopManager(loopManager),
fInputDataSource(inputDataSource)
{
EnsureValidSnapshotTTreeOutput(fOptions, fTreeName, fFileName);
}
Expand Down Expand Up @@ -1766,7 +1791,9 @@ public:
if (r) {
// not an empty-source RDF
fInputTrees[slot] = r->GetTree();
}
} else if (auto treeDS = dynamic_cast<ROOT::Internal::RDF::RTTreeDS *>(fInputDataSource))
fInputTrees[slot] = treeDS->GetTree();

fBranchAddressesNeedReset[slot] = 1; // reset first event flag for this slot
}

Expand Down Expand Up @@ -1855,6 +1882,10 @@ public:
// flush all buffers to disk by destroying the TBufferMerger
fOutputFiles.clear();
fMerger.reset();

// Now connect the data source to the loop manager so it can be used for further processing
auto fullTreeName = fDirName.empty() ? fTreeName : fDirName + '/' + fTreeName;
fOutputLoopManager->SetDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(fullTreeName, fFileName));
}

std::string GetActionName() { return "Snapshot"; }
Expand All @@ -1878,8 +1909,16 @@ public:
SnapshotTTreeHelperMT MakeNew(void *newName, std::string_view /*variation*/ = "nominal")
{
const std::string finalName = *reinterpret_cast<const std::string *>(newName);
return SnapshotTTreeHelperMT{fNSlots, finalName, fDirName, fTreeName,
fInputBranchNames, fOutputBranchNames, fOptions, std::vector<bool>(fIsDefine)};
return SnapshotTTreeHelperMT{fNSlots,
finalName,
fDirName,
fTreeName,
fInputBranchNames,
fOutputBranchNames,
fOptions,
std::vector<bool>(fIsDefine),
fOutputLoopManager,
fInputDataSource};
}
};

Expand Down Expand Up @@ -1907,7 +1946,7 @@ class R__CLING_PTRCHECK(off) SnapshotRNTupleHelper : public RActionImpl<Snapshot
std::unique_ptr<TFile> fOutputFile{nullptr};

RSnapshotOptions fOptions;
ROOT::Detail::RDF::RLoopManager *fLoopManager;
ROOT::Detail::RDF::RLoopManager *fOutputLoopManager;
ColumnNames_t fInputFieldNames; // This contains the resolved aliases
ColumnNames_t fOutputFieldNames;
std::unique_ptr<ROOT::Experimental::RNTupleWriter> fWriter{nullptr};
Expand All @@ -1925,7 +1964,7 @@ public:
fDirName(dirname),
fNTupleName(ntuplename),
fOptions(options),
fLoopManager(lm),
fOutputLoopManager(lm),
fInputFieldNames(vfnames),
fOutputFieldNames(ReplaceDotWithUnderscore(fnames)),
fIsDefine(std::move(isDefine))
Expand All @@ -1939,7 +1978,7 @@ public:
SnapshotRNTupleHelper &operator=(SnapshotRNTupleHelper &&) = default;
~SnapshotRNTupleHelper()
{
if (!fNTupleName.empty() && !fLoopManager->GetDataSource() && fOptions.fLazy)
if (!fNTupleName.empty() && !fOutputLoopManager->GetDataSource() && fOptions.fLazy)
Warning("Snapshot", "A lazy Snapshot action was booked but never triggered.");
}

Expand Down Expand Up @@ -1999,7 +2038,7 @@ public:
{
fWriter.reset();
// We can now set the data source of the loop manager for the RDataFrame that is returned by the Snapshot call.
fLoopManager->SetDataSource(
fOutputLoopManager->SetDataSource(
std::make_unique<ROOT::Experimental::RNTupleDS>(fDirName + "/" + fNTupleName, fFileName));
}

Expand Down Expand Up @@ -2029,7 +2068,7 @@ public:
fInputFieldNames,
fOutputFieldNames,
fOptions,
fLoopManager,
fOutputLoopManager,
std::vector<bool>(fIsDefine)};
}
};
Expand Down
9 changes: 6 additions & 3 deletions tree/dataframe/inc/ROOT/RDF/ColumnReaderUtils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include <typeinfo> // for typeid
#include <vector>

class TTreeReader;

namespace ROOT {
namespace Internal {
namespace RDF {
Expand All @@ -56,7 +58,7 @@ struct RColumnReadersInfo {
/// Create a group of column readers, one per type in the parameter pack.
template <typename... ColTypes>
std::array<RDFDetail::RColumnReaderBase *, sizeof...(ColTypes)>
GetColumnReaders(unsigned int slot, TTreeReader *r, TypeList<ColTypes...>, const RColumnReadersInfo &colInfo,
GetColumnReaders(unsigned int slot, TTreeReader *treeReader, TypeList<ColTypes...>, const RColumnReadersInfo &colInfo,
const std::string &variationName = "nominal")
{
// see RColumnReadersInfo for why we pass these arguments like this rather than directly as function arguments
Expand All @@ -65,9 +67,10 @@ GetColumnReaders(unsigned int slot, TTreeReader *r, TypeList<ColTypes...>, const
auto &colRegister = colInfo.fColRegister;

int i = -1;

std::array<RDFDetail::RColumnReaderBase *, sizeof...(ColTypes)> ret{
(++i, GetColumnReader(slot, colRegister.GetReader(slot, colNames[i], variationName, typeid(ColTypes)), lm, r,
colNames[i], typeid(ColTypes)))...};
(++i, GetColumnReader(slot, colRegister.GetReader(slot, colNames[i], variationName, typeid(ColTypes)), lm,
treeReader, colNames[i], typeid(ColTypes)))...};
return ret;
}

Expand Down
41 changes: 25 additions & 16 deletions tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ struct SnapshotHelperArgs {
std::string fTreeName;
std::vector<std::string> fOutputColNames;
ROOT::RDF::RSnapshotOptions fOptions;
RDFDetail::RLoopManager *fLoopManager;
ROOT::Detail::RDF::RLoopManager *fLoopManager;
ROOT::RDF::RDataSource *fDataSource;
bool fToNTuple;
};

Expand All @@ -266,6 +267,8 @@ BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperA
const auto &treename = snapHelperArgs->fTreeName;
const auto &outputColNames = snapHelperArgs->fOutputColNames;
const auto &options = snapHelperArgs->fOptions;
const auto &lmPtr = snapHelperArgs->fLoopManager;
const auto &dataSource = snapHelperArgs->fDataSource;

auto sz = sizeof...(ColTypes);
std::vector<bool> isDefine(sz);
Expand All @@ -280,10 +283,8 @@ BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperA
using Helper_t = SnapshotRNTupleHelper<ColTypes...>;
using Action_t = RAction<Helper_t, PrevNodeType>;

auto loopManager = snapHelperArgs->fLoopManager;

actionPtr.reset(new Action_t(
Helper_t(filename, dirname, treename, colNames, outputColNames, options, loopManager, std::move(isDefine)),
Helper_t(filename, dirname, treename, colNames, outputColNames, options, lmPtr, std::move(isDefine)),
colNames, prevNode, colRegister));
} else {
// multi-thread snapshot to RNTuple is not yet supported
Expand All @@ -302,16 +303,16 @@ BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperA
// single-thread snapshot
using Helper_t = SnapshotTTreeHelper<ColTypes...>;
using Action_t = RAction<Helper_t, PrevNodeType>;
actionPtr.reset(
new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options, std::move(isDefine)),
colNames, prevNode, colRegister));
actionPtr.reset(new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options,
std::move(isDefine), lmPtr, dataSource),
colNames, prevNode, colRegister));
} else {
// multi-thread snapshot
using Helper_t = SnapshotTTreeHelperMT<ColTypes...>;
using Action_t = RAction<Helper_t, PrevNodeType>;
actionPtr.reset(new Action_t(
Helper_t(nSlots, filename, dirname, treename, colNames, outputColNames, options, std::move(isDefine)),
colNames, prevNode, colRegister));
actionPtr.reset(new Action_t(Helper_t(nSlots, filename, dirname, treename, colNames, outputColNames, options,
std::move(isDefine), lmPtr, dataSource),
colNames, prevNode, colRegister));
}
}
return actionPtr;
Expand Down Expand Up @@ -412,8 +413,15 @@ std::vector<bool> FindUndefinedDSColumns(const ColumnNames_t &requestedCols, con
template <typename T>
void AddDSColumnsHelper(const std::string &colName, RLoopManager &lm, RDataSource &ds, RColumnRegister &colRegister)
{
if (colRegister.IsDefineOrAlias(colName) || !ds.HasColumn(colName) ||
lm.HasDataSourceColumnReaders(colName, typeid(T)))

if (colRegister.IsDefineOrAlias(colName))
return;

if (lm.HasDataSourceColumnReaders(colName, typeid(T)))
return;

if (!ds.HasColumn(colName) &&
lm.GetSuppressErrorsForMissingBranches().find(colName) == lm.GetSuppressErrorsForMissingBranches().end())
return;

const auto nSlots = lm.GetNSlots();
Expand All @@ -428,7 +436,8 @@ void AddDSColumnsHelper(const std::string &colName, RLoopManager &lm, RDataSourc
} else { // using the new GetColumnReaders mechanism
// TODO consider changing the interface so we return all of these for all slots in one go
for (auto slot = 0u; slot < lm.GetNSlots(); ++slot)
colReaders.emplace_back(ds.GetColumnReaders(slot, colName, typeid(T)));
colReaders.emplace_back(
ROOT::Internal::RDF::CreateColumnReader(ds, slot, colName, typeid(T), /*treeReader*/ nullptr));
}

lm.AddDataSourceColumnReaders(colName, std::move(colReaders), typeid(T));
Expand Down Expand Up @@ -540,7 +549,7 @@ void JitDefineHelper(F &&f, const char **colsPtr, std::size_t colsSize, std::str
using ColTypes_t = typename TTraits::CallableTraits<Callable_t>::arg_types;

auto ds = lm->GetDataSource();
if (ds != nullptr)
if (ds != nullptr && colsPtr)
AddDSColumns(cols, *lm, *ds, ColTypes_t(), *colRegister);

// will never actually be used (trumped by jittedDefine->GetTypeName()), but we set it to something meaningful
Expand Down Expand Up @@ -800,8 +809,8 @@ template <typename T>
using InnerValueType_t = typename InnerValueType<T>::type;

std::pair<std::vector<std::string>, std::vector<std::string>>
AddSizeBranches(const std::vector<std::string> &branches, TTree *tree, std::vector<std::string> &&colsWithoutAliases,
std::vector<std::string> &&colsWithAliases);
AddSizeBranches(const std::vector<std::string> &branches, ROOT::RDF::RDataSource *ds,
std::vector<std::string> &&colsWithoutAliases, std::vector<std::string> &&colsWithAliases);

void RemoveDuplicates(ColumnNames_t &columnNames);

Expand Down
Loading
Loading