Skip to content

Commit

Permalink
Collect isBlocked and isFinished timings in OperatorStats
Browse files Browse the repository at this point in the history
For some operators, the isBlocked() function call is nontrivial. This
commit adds the isBlockedTiming to OperatorStats and count it in
Driver::runInternal(). It also count the isFinished() operator which
was previously missed being counted..

Note that this does not include the Presto protocol change, which will
be done later.
  • Loading branch information
yingsu00 committed Sep 27, 2024
1 parent c61a353 commit 1f1285f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 46 deletions.
99 changes: 54 additions & 45 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,23 +534,28 @@ StopReason Driver::runInternal(
return blockDriver(self, i, std::move(future), blockingState, guard);
}

CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
});

if (blockingReason_ != BlockingReason::kNotBlocked) {
return blockDriver(self, i, std::move(future), blockingState, guard);
}

if (i < numOperators - 1) {
Operator* nextOp = operators_[i + 1].get();

CALL_OPERATOR(
blockingReason_ = nextOp->isBlocked(&future),
nextOp,
curOperatorId_ + 1,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = nextOp->isBlocked(&future),
nextOp,
curOperatorId_ + 1,
kOpMethodIsBlocked);
});
if (blockingReason_ != BlockingReason::kNotBlocked) {
return blockDriver(
self, i + 1, std::move(future), blockingState, guard);
Expand All @@ -565,27 +570,24 @@ StopReason Driver::runInternal(
if (needsInput) {
uint64_t resultBytes = 0;
RowVectorPtr intermediateResult;
{
withDeltaCpuWallTimer(op, &OperatorStats::getOutputTiming, [&]() {
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::getOutput",
op);
CALL_OPERATOR(
intermediateResult = op->getOutput(),
op,
curOperatorId_,
kOpMethodGetOutput);
if (intermediateResult) {
validateOperatorOutputResult(intermediateResult, *op);
resultBytes = intermediateResult->estimateFlatSize();
{
auto lockedStats = op->stats().wlock();
lockedStats->addOutputVector(
resultBytes, intermediateResult->size());
}
withDeltaCpuWallTimer(op, &OperatorStats::getOutputTiming, [&]() {
TestValue::adjust(
"facebook::velox::exec::Driver::runInternal::getOutput", op);
CALL_OPERATOR(
intermediateResult = op->getOutput(),
op,
curOperatorId_,
kOpMethodGetOutput);
if (intermediateResult) {
validateOperatorOutputResult(intermediateResult, *op);
resultBytes = intermediateResult->estimateFlatSize();
{
auto lockedStats = op->stats().wlock();
lockedStats->addOutputVector(
resultBytes, intermediateResult->size());
}
});
}
}
});
pushdownFilters(i);
if (intermediateResult) {
withDeltaCpuWallTimer(op, &OperatorStats::addInputTiming, [&]() {
Expand Down Expand Up @@ -621,21 +623,26 @@ StopReason Driver::runInternal(
// is not blocked and empty, this is finished. If this is
// not the source, just try to get output from the one
// before.
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
});
if (blockingReason_ != BlockingReason::kNotBlocked) {
return blockDriver(
self, i, std::move(future), blockingState, guard);
}

bool finished{false};
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
withDeltaCpuWallTimer(op, &OperatorStats::finishTiming, [&]() {
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
});
if (finished) {
withDeltaCpuWallTimer(
op, &OperatorStats::finishTiming, [this, &nextOp]() {
Expand Down Expand Up @@ -682,11 +689,13 @@ StopReason Driver::runInternal(
}

bool finished{false};
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
withDeltaCpuWallTimer(op, &OperatorStats::finishTiming, [&]() {
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
});
if (finished) {
guard.notThrown();
close();
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ void OperatorStats::add(const OperatorStats& other) {

finishTiming.add(other.finishTiming);

isBlockedTiming.add(other.isBlockedTiming);

backgroundTiming.add(other.backgroundTiming);

memoryStats.add(other.memoryStats);
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ struct OperatorStats {
/// read.
int64_t numSplits{0};

CpuWallTiming isBlockedTiming;

/// Bytes read from raw source, e.g. compressed file or network connection.
uint64_t rawInputBytes = 0;
uint64_t rawInputPositions = 0;
Expand Down
1 change: 1 addition & 0 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {
cpuWallTiming.add(stats.addInputTiming);
cpuWallTiming.add(stats.getOutputTiming);
cpuWallTiming.add(stats.finishTiming);
cpuWallTiming.add(stats.isBlockedTiming);

backgroundTiming.add(stats.backgroundTiming);

Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,7 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) {
EXPECT_EQ(32 * i, operatorStats.outputBytes);
EXPECT_EQ(3 * i, operatorStats.outputPositions);
EXPECT_EQ(i, operatorStats.outputVectors);
EXPECT_EQ(2 * (i + 1), operatorStats.isBlockedTiming.count);
EXPECT_EQ(0, operatorStats.finishTiming.count);
EXPECT_EQ(0, operatorStats.backgroundTiming.count);

Expand All @@ -1184,7 +1185,8 @@ DEBUG_ONLY_TEST_F(TaskTest, liveStats) {
EXPECT_EQ(32 * numBatches, operatorStats.outputBytes);
EXPECT_EQ(3 * numBatches, operatorStats.outputPositions);
EXPECT_EQ(numBatches, operatorStats.outputVectors);
EXPECT_EQ(1, operatorStats.finishTiming.count);
EXPECT_EQ(2 * numBatches, operatorStats.isBlockedTiming.count);
EXPECT_EQ(2, operatorStats.finishTiming.count);
// No operators with background CPU time yet.
EXPECT_EQ(0, operatorStats.backgroundTiming.count);

Expand Down

0 comments on commit 1f1285f

Please sign in to comment.