Skip to content

Commit

Permalink
Collect isBlocked and isFinished timings in OperatorStats
Browse files Browse the repository at this point in the history
For some operators, the isBlocked() function call is nontrivial. This
commit adds the isBlockedTiming to OperatorStats and count it in
Driver::runInternal(). It also count the isFinished() operator which
was previously missed being counted..

Note that this does not include the Presto protocol change, which will
be done later.
  • Loading branch information
yingsu00 committed Sep 1, 2024
1 parent 54b1e60 commit 62e0f60
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
61 changes: 36 additions & 25 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,11 +570,14 @@ StopReason Driver::runInternal(
return blockDriver(self, op, std::move(future), blockingState);
}

CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
});

if (blockingReason_ != BlockingReason::kNotBlocked) {
guard.notThrown();
return blockDriver(self, op, std::move(future), blockingState);
Expand All @@ -583,11 +586,13 @@ StopReason Driver::runInternal(
if (i < numOperators - 1) {
Operator* nextOp = operators_[i + 1].get();

CALL_OPERATOR(
blockingReason_ = nextOp->isBlocked(&future),
nextOp,
curOperatorId_ + 1,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = nextOp->isBlocked(&future),
nextOp,
curOperatorId_ + 1,
kOpMethodIsBlocked);
});
if (blockingReason_ != BlockingReason::kNotBlocked) {
guard.notThrown();
return blockDriver(self, op, std::move(future), blockingState);
Expand Down Expand Up @@ -658,21 +663,25 @@ StopReason Driver::runInternal(
// is not blocked and empty, this is finished. If this is
// not the source, just try to get output from the one
// before.
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
withDeltaCpuWallTimer(op, &OperatorStats::isBlockedTiming, [&]() {
CALL_OPERATOR(
blockingReason_ = op->isBlocked(&future),
op,
curOperatorId_,
kOpMethodIsBlocked);
});
if (blockingReason_ != BlockingReason::kNotBlocked) {
guard.notThrown();
return blockDriver(self, op, std::move(future), blockingState);
}
bool finished{false};
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
withDeltaCpuWallTimer(op, &OperatorStats::finishTiming, [&]() {
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
});
if (finished) {
withDeltaCpuWallTimer(
op, &OperatorStats::finishTiming, [this, &nextOp]() {
Expand Down Expand Up @@ -719,11 +728,13 @@ StopReason Driver::runInternal(
}

bool finished{false};
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
withDeltaCpuWallTimer(op, &OperatorStats::finishTiming, [&]() {
CALL_OPERATOR(
finished = op->isFinished(),
op,
curOperatorId_,
kOpMethodIsFinished);
});
if (finished) {
guard.notThrown();
close();
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ void OperatorStats::add(const OperatorStats& other) {

finishTiming.add(other.finishTiming);

isBlockedTiming.add(other.isBlockedTiming);

backgroundTiming.add(other.backgroundTiming);

memoryStats.add(other.memoryStats);
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ struct OperatorStats {
/// read.
int64_t numSplits{0};

CpuWallTiming isBlockedTiming;

/// Bytes read from raw source, e.g. compressed file or network connection.
uint64_t rawInputBytes = 0;
uint64_t rawInputPositions = 0;
Expand Down
1 change: 1 addition & 0 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {
cpuWallTiming.add(stats.addInputTiming);
cpuWallTiming.add(stats.getOutputTiming);
cpuWallTiming.add(stats.finishTiming);
cpuWallTiming.add(stats.isBlockedTiming);

backgroundTiming.add(stats.backgroundTiming);

Expand Down

0 comments on commit 62e0f60

Please sign in to comment.