Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbcon/joblist/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ set(joblist_LIB_SRCS
jobstepassociation.cpp
lbidlist.cpp
limitedorderby.cpp
disk-based-topnorderby.cpp
passthrucommand-jl.cpp
passthrustep.cpp
pcolscan.cpp
Expand Down
72 changes: 72 additions & 0 deletions dbcon/joblist/disk-based-topnorderby.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/* Copyright (C) 2025 MariaDB Corp.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */

#include <vector>

#include "dumper.h"
#include "disk-based-topnorderby.h"
namespace joblist
{

// The caller ensures lifetime of dl and rg
void DiskBasedTopNOrderBy::flushCurrentToDisk(RowGroupDL& dl, rowgroup::RowGroup rg, const size_t numberOfRGs, const bool firstFlush)
{
size_t rgid = (firstFlush) ? numberOfRGs : 0;
rowgroup::RGData rgData;

size_t generation = (firstFlush) ? getGenerationCounter() : 0; // WIP

bool more = dl.next(0, &rgData);
while (more)
{
saveRG(rgid, generation, rg, &rgData);
more = dl.next(0, &rgData);
rgid = (firstFlush) ? rgid - 1 : rgid + 1;
}

if (firstFlush)
{
incrementGenerationCounter();
}
else
{

}
}
void DiskBasedTopNOrderBy::diskBasedMergePhaseIfNeeded(std::vector<RowGroupDLSPtr>& /*dataLists*/)
{
}

std::vector<std::string> DiskBasedTopNOrderBy::getGenerationFileNamesNextBatch(const size_t batchSize)
{
// assert(getGenerationFilesNumber() > batchSize);
auto totalNumberOfFilesYetToMerge = getGenerationFilesNumber() - batchSize;
auto batchSizeOrFilesLeftNumber = std::max(getGenerationFilesNumber(), batchSize);
auto actualBatchSize = std::min(totalNumberOfFilesYetToMerge, batchSizeOrFilesLeftNumber);
// add state for the starting offset + wraparound
size_t startOffset = 0;
std::vector<std::string> res;
res.reserve(actualBatchSize);
for (size_t i = 0; i < startOffset + actualBatchSize; ++i)
{
res.push_back(makeRGFilePrefix(i));
}

return res;


} // namespace joblist
77 changes: 77 additions & 0 deletions dbcon/joblist/disk-based-topnorderby.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/* Copyright (C) 2025 MariaDB Corp.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */

#pragma once

#include <cstdint>
#include <queue>
#include <string>
#include <vector>

#include "dumper.h"
#include "elementtype.h"
#include "resourcemanager.h"
namespace joblist
{

class DiskBasedTopNOrderBy : public rowgroup::RGDumper
{
// std::string fTmpDir =
// config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Aggregates);
// std::string fCompStr = config::Config::makeConfig()->getConfig("RowAggregation", "Compression");
public:
// TODO Parametrize compression, tmpdir and memory manager (can be temp)
DiskBasedTopNOrderBy(ResourceManager* /*rm*/)
: RGDumper(compress::getCompressInterfaceByName("LZ4"), std::make_unique<rowgroup::MemManager>(),
config::Config::makeConfig()->getTempFileDir(config::Config::TempDirPurpose::Sorting),
"Sorting", reinterpret_cast<std::uintptr_t>(this))
{
}
~DiskBasedTopNOrderBy() = default;

void incrementGenerationCounter()
{
++fGenerationCounter;
uint64_t newGeneration = (fGenerations.empty()) ? 1 : fGenerations.back() + 1;
fGenerations.push(newGeneration);
}
uint64_t getGenerationCounter() const
{
return (fGenerations.empty()) ? 0 : fGenerations.back();
}

bool isDiskBased() const
{
return fGenerationCounter > 0;
}

size_t getGenerationFilesNumber() const
{
return 0;
}
std::vector<std::string> getGenerationFileNamesNextBatch(const size_t batchSize);

// The caller ensures lifetime of dl and rg
void flushCurrentToDisk(RowGroupDL& dl, rowgroup::RowGroup rg, const size_t numberOfRGs, const bool firstFlush);
void diskBasedMergePhaseIfNeeded(std::vector<RowGroupDLSPtr>& dataLists);

// private:
uint64_t fGenerationCounter{0};
std::queue<uint64_t> fGenerations;
};

} // namespace joblist
15 changes: 3 additions & 12 deletions dbcon/joblist/elementtype.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016-2025 MariaDB Corporation

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -238,16 +239,7 @@ extern std::istream& operator>>(std::istream& in, TupleType& rhs);
extern std::ostream& operator<<(std::ostream& out, const TupleType& rhs);
} // namespace joblist

#ifndef NO_DATALISTS

// #include "bandeddl.h"
// #include "wsdl.h"
#include "fifo.h"
// #include "bucketdl.h"
// #include "constantdatalist.h"
// #include "swsdl.h"
// #include "zdl.h"
// #include "deliverywsdl.h"

namespace joblist
{
Expand Down Expand Up @@ -327,7 +319,8 @@ typedef DataList<StringElementType> StrDataList;
// */
// typedef BucketDL<TupleType> TupleBucketDataList;

typedef FIFO<rowgroup::RGData> RowGroupDL;
using RowGroupDL = FIFO<rowgroup::RGData>;
using RowGroupDLSPtr = std::shared_ptr<RowGroupDL>;

} // namespace joblist

Expand Down Expand Up @@ -425,5 +418,3 @@ extern std::ostream& showOidInDL(std::ostream& strm);
extern std::ostream& omitOidInDL(std::ostream& strm);

} // namespace joblist

#endif
11 changes: 2 additions & 9 deletions dbcon/joblist/jlf_tuplejoblist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,12 +499,6 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
deliverySteps[CNX_VTABLE_ID] = ws;
}

// TODO MCOL-894 we don't need to run sorting|distinct
// every time
// if ((jobInfo.limitCount != (uint64_t) - 1) ||
// (jobInfo.constantCol == CONST_COL_EXIST) ||
// (jobInfo.hasDistinct))
// {
if (jobInfo.annexStep.get() == NULL)
jobInfo.annexStep.reset(new TupleAnnexStep(jobInfo));

Expand All @@ -513,20 +507,19 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,

if (jobInfo.orderByColVec.size() > 0)
{
tas->addOrderBy(new LimitedOrderBy());
tas->addOrderBy(jobInfo.rm);
if (jobInfo.orderByThreads > 1)
tas->setParallelOp();
tas->setMaxThreads(jobInfo.orderByThreads);
}

// TODO decouple TCS from TNS
if (jobInfo.constantCol == CONST_COL_EXIST)
tas->addConstant(new TupleConstantStep(jobInfo));

if (jobInfo.hasDistinct)
tas->setDistinct();

// }

if (jobInfo.annexStep)
{
TupleDeliveryStep* ds = dynamic_cast<TupleDeliveryStep*>(deliverySteps[CNX_VTABLE_ID].get());
Expand Down
Loading