Skip to content

Commit 6ea996f

Browse files
Pass BF to BPP through TBPS->BPPSeeder
1 parent 389311c commit 6ea996f

File tree

9 files changed

+173
-0
lines changed

9 files changed

+173
-0
lines changed

dbcon/joblist/blockedbloomfilter.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,10 @@ void BlockedBloomFilter::deserialize(messageqcpp::ByteStream& bs)
6969
}
7070
}
7171

72+
size_t BlockedBloomFilter::getSize() const
73+
{
74+
return bloomFilter.size();
75+
}
76+
7277
} // namespace joblist
7378

dbcon/joblist/blockedbloomfilter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ class BlockedBloomFilter
2424
void serialize(messageqcpp::ByteStream& bs) const;
2525
void deserialize(messageqcpp::ByteStream& bs);
2626

27+
size_t getSize() const;
28+
2729
private:
2830
// Member variables
2931
static constexpr uint8_t HASH_FUNC_COUNT = 8;

dbcon/joblist/distributedenginecomm.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,7 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
882882
case BATCH_PRIMITIVE_ADD_JOINER:
883883
case BATCH_PRIMITIVE_END_JOINER:
884884
case BATCH_PRIMITIVE_ABORT:
885+
case BATCH_PRIMITIVE_BLOOM_FILTER:
885886
case DICT_CREATE_EQUALITY_FILTER:
886887
case DICT_DESTROY_EQUALITY_FILTER:
887888
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"

dbcon/joblist/primitivemsg.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ enum ISMPACKETCOMMAND
167167
BATCH_PRIMITIVE_END_JOINER = PRIM_LOCALBASE + 11,
168168
BATCH_PRIMITIVE_ACK = PRIM_LOCALBASE + 12,
169169
BATCH_PRIMITIVE_ABORT = PRIM_LOCALBASE + 13,
170+
BATCH_PRIMITIVE_BLOOM_FILTER = PRIM_LOCALBASE + 14,
170171

171172
// max of 100-50=50 commands
172173
COL_RESULTS = PRIM_COLBASE + 0,

dbcon/joblist/primitivestep.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,7 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
14751475

14761476
// Blocked Bloom filter
14771477
std::vector<std::shared_ptr<std::array<std::optional<BlockedBloomFilter>, 2>>> bloomFilters;
1478+
void serializeBloomFilters();
14781479

14791480
public:
14801481
void setBloomFilters(std::vector<std::shared_ptr<std::array<std::optional<BlockedBloomFilter>, 2>>>&& bloomFilters);

dbcon/joblist/tuple-bps.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1480,7 +1480,10 @@ void TupleBPS::run()
14801480
BPPIsAllocated = true;
14811481

14821482
if (doJoin && tjoiners[0]->inPM())
1483+
{
1484+
serializeBloomFilters();
14831485
serializeJoiner();
1486+
}
14841487

14851488
prepCasualPartitioning();
14861489
startPrimitiveThread();
@@ -3399,6 +3402,61 @@ void TupleBPS::setBloomFilters(std::vector<std::shared_ptr<std::array<std::optio
33993402
this->bloomFilters = std::move(bloomFilters);
34003403
}
34013404

3405+
void TupleBPS::serializeBloomFilters()
3406+
{
3407+
messageqcpp::ByteStream bs;
3408+
ISMPacketHeader ism{};
3409+
3410+
ism.Interleave = 0;
3411+
ism.Flags = 0;
3412+
ism.Command = BATCH_PRIMITIVE_BLOOM_FILTER;
3413+
ism.Type = 2;
3414+
3415+
uint32_t messageSize = sizeof(ISMPacketHeader);
3416+
3417+
// How to get rid of iterating over BFs 2 times?
3418+
for (const auto& joinerBF : bloomFilters)
3419+
{
3420+
for (size_t i = 0; i < joinerBF->size(); ++i)
3421+
{
3422+
messageSize += sizeof(uint8_t);
3423+
3424+
if ((*joinerBF)[i].has_value())
3425+
{
3426+
messageSize += (*joinerBF)[i]->getSize() * sizeof(uint64_t);
3427+
}
3428+
3429+
}
3430+
}
3431+
3432+
ism.Size = messageSize;
3433+
bs.append((uint8_t*)&ism, sizeof(ism));
3434+
3435+
bs << txnId();
3436+
bs << sessionId();
3437+
bs << static_cast<uint32_t>(stepId());
3438+
bs << uniqueID;
3439+
3440+
for (const auto& joinerBF : bloomFilters)
3441+
{
3442+
for (size_t i = 0; i < joinerBF->size(); ++i)
3443+
{
3444+
uint8_t hasBloomFilter = (*joinerBF)[i].has_value() ? 1 : 0;
3445+
bs << hasBloomFilter;
3446+
3447+
if (hasBloomFilter)
3448+
{
3449+
(*joinerBF)[i]->serialize(bs);
3450+
}
3451+
3452+
}
3453+
}
3454+
3455+
SBS sbs(new messageqcpp::ByteStream(bs));
3456+
fDec->write(uniqueID, sbs);
3457+
3458+
}
3459+
34023460
template bool TupleBPS::processOneFilterType<int64_t>(int8_t colWidth, int64_t value, uint32_t type) const;
34033461
template bool TupleBPS::processOneFilterType<int128_t>(int8_t colWidth, int128_t value, uint32_t type) const;
34043462

primitives/primproc/batchprimitiveprocessor.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2695,4 +2695,34 @@ void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount)
26952695
vssCache.insert(make_pair(lbidList[i], vssData[i]));
26962696
}
26972697

2698+
void BatchPrimitiveProcessor::addBloomFilters([[maybe_unused]] messageqcpp::ByteStream& bs)
2699+
{
2700+
bs.advance(sizeof(ISMPacketHeader) + 4 * sizeof(uint32_t));
2701+
if (bloomFilters.empty() && doJoin)
2702+
{
2703+
bloomFilters.resize(joinerCount);
2704+
}
2705+
2706+
for (size_t j = 0; j < joinerCount; ++j)
2707+
{
2708+
for (size_t i = 0; i < 2; ++i)
2709+
{
2710+
uint8_t hasBloomFilter = 0;
2711+
bs >> hasBloomFilter;
2712+
2713+
if (hasBloomFilter)
2714+
{
2715+
if (!bloomFilters[j])
2716+
bloomFilters[j] = std::make_shared<std::array<std::optional<BlockedBloomFilter>, 2>>();
2717+
2718+
(*bloomFilters[j]).at(i).emplace();
2719+
(*bloomFilters[j]).at(i)->deserialize(bs);
2720+
2721+
}
2722+
2723+
}
2724+
}
2725+
2726+
}
2727+
26982728
} // namespace primitiveprocessor

primitives/primproc/batchprimitiveprocessor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,12 @@ class BatchPrimitiveProcessor
436436
bool initiatedByEM_;
437437
uint32_t weight_;
438438

439+
// Blocked Bloom Filter
440+
std::vector<std::shared_ptr<std::array<std::optional<joblist::BlockedBloomFilter>, 2>>> bloomFilters;
441+
public:
442+
void addBloomFilters(messageqcpp::ByteStream& bs);
443+
private:
444+
439445
uint32_t maxPmJoinResultCount = 1048576;
440446
friend class Command;
441447
friend class ColumnCommand;

primitives/primproc/primitiveserver.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,6 +1279,20 @@ struct BPPHandler
12791279
}
12801280
};
12811281

1282+
struct BloomFilter : public BPPHandlerFunctor
1283+
{
1284+
BloomFilter(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(std::move(r), std::move(b))
1285+
{
1286+
}
1287+
1288+
int operator()() override
1289+
{
1290+
utils::setThreadName("PPHandBloomFilter");
1291+
return rt->addBloomFiltersToBPP(*bs, dieTime);
1292+
}
1293+
1294+
};
1295+
12821296
int doAbort(ByteStream& bs, const posix_time::ptime& dieTime)
12831297
{
12841298
uint32_t key;
@@ -1486,6 +1500,37 @@ struct BPPHandler
14861500
}
14871501
}
14881502

1503+
int addBloomFiltersToBPP(ByteStream& bs, const posix_time::ptime& dieTime)
1504+
{
1505+
SBPPV bppv;
1506+
uint32_t uniqueID;
1507+
const uint8_t* buf;
1508+
1509+
buf = bs.buf();
1510+
uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t)]);
1511+
1512+
bppv = grabBPPs(uniqueID);
1513+
1514+
if (bppv)
1515+
{
1516+
boost::shared_lock<boost::shared_mutex> lk(getDJLock(uniqueID));
1517+
bppv->get()[0]->addBloomFilters(bs);
1518+
return 0;
1519+
}
1520+
else
1521+
{
1522+
if (posix_time::second_clock::universal_time() > dieTime)
1523+
{
1524+
cout << "addBloomFilterToBPP: job for id " << uniqueID << " has been killed." << endl;
1525+
return 0;
1526+
}
1527+
else
1528+
{
1529+
return -1;
1530+
}
1531+
}
1532+
}
1533+
14891534
int lastJoinerMsg(ByteStream& bs, const posix_time::ptime& dieTime)
14901535
{
14911536
SBPPV bppv;
@@ -2024,6 +2069,30 @@ struct ReadThread
20242069
fBPPHandler->doAck(*sbs);
20252070
break;
20262071
}
2072+
2073+
case BATCH_PRIMITIVE_BLOOM_FILTER:
2074+
{
2075+
const uint8_t* buf = sbs->buf();
2076+
uint32_t pos = sizeof(ISMPacketHeader);
2077+
const uint32_t txnId = *((uint32_t*)&buf[pos]);
2078+
[[maybe_unused]] const uint32_t sessionID = *((uint32_t*)&buf[pos + 4]);
2079+
const uint32_t stepID = *((uint32_t*)&buf[pos + 8]);
2080+
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 12]);
2081+
2082+
const uint32_t id = 0;
2083+
const uint32_t weight = threadpool::MetaJobsInitialWeight;
2084+
const uint32_t priority = 0;
2085+
2086+
boost::shared_ptr<FairThreadPool::Functor> functor;
2087+
functor.reset(new BPPHandler::BloomFilter(fBPPHandler, sbs));
2088+
2089+
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
2090+
OOBProcPool->addJob(job);
2091+
2092+
2093+
break;
2094+
}
2095+
20272096
default:
20282097
{
20292098
std::ostringstream os;

0 commit comments

Comments
 (0)