Skip to content

Commit c848547

Browse files
authored
Merge pull request #169 from skie/feature/queue-processor-event-timing
Add timing to queue Processor events
2 parents ab5d6ee + 7b9506e commit c848547

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

src/Queue/Processor.php

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public function process(QueueMessage $message, Context $context): string|object
7979
return InteropProcessor::REJECT;
8080
}
8181

82+
$startTime = microtime(true) * 1000;
8283
$this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]);
8384

8485
try {
@@ -90,27 +91,39 @@ public function process(QueueMessage $message, Context $context): string|object
9091
$this->dispatchEvent('Processor.message.exception', [
9192
'message' => $jobMessage,
9293
'exception' => $e,
94+
'duration' => (int)((microtime(true) * 1000) - $startTime),
9395
]);
9496

9597
return Result::requeue('Exception occurred while processing message');
9698
}
9799

100+
$duration = (int)((microtime(true) * 1000) - $startTime);
101+
98102
if ($response === InteropProcessor::ACK) {
99103
$this->logger->debug('Message processed successfully');
100-
$this->dispatchEvent('Processor.message.success', ['message' => $jobMessage]);
104+
$this->dispatchEvent('Processor.message.success', [
105+
'message' => $jobMessage,
106+
'duration' => $duration,
107+
]);
101108

102109
return InteropProcessor::ACK;
103110
}
104111

105112
if ($response === InteropProcessor::REJECT) {
106113
$this->logger->debug('Message processed with rejection');
107-
$this->dispatchEvent('Processor.message.reject', ['message' => $jobMessage]);
114+
$this->dispatchEvent('Processor.message.reject', [
115+
'message' => $jobMessage,
116+
'duration' => $duration,
117+
]);
108118

109119
return InteropProcessor::REJECT;
110120
}
111121

112122
$this->logger->debug('Message processed with failure, requeuing');
113-
$this->dispatchEvent('Processor.message.failure', ['message' => $jobMessage]);
123+
$this->dispatchEvent('Processor.message.failure', [
124+
'message' => $jobMessage,
125+
'duration' => $duration,
126+
]);
114127

115128
return InteropProcessor::REQUEUE;
116129
}

tests/TestCase/Queue/ProcessorTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public function testProcess($jobMethod, $expected, $logMessage, $dispatchedEvent
9595
$data = $events[2]->getData();
9696
$this->assertArrayHasKey('message', $data);
9797
$this->assertSame($message->jsonSerialize(), $data['message']->jsonSerialize());
98+
99+
// Verify timing information is present in completion events
100+
$this->assertArrayHasKey('duration', $data);
101+
$this->assertIsInt($data['duration']);
102+
$this->assertGreaterThanOrEqual(0, $data['duration']);
98103
}
99104

100105
/**
@@ -154,6 +159,14 @@ public function testProcessWillRequeueOnException()
154159

155160
$result = $processor->process($queueMessage, $context);
156161
$this->assertEquals(InteropProcessor::REQUEUE, $result);
162+
163+
// Verify timing information is present in exception event
164+
$this->assertSame(3, $events->count());
165+
$this->assertSame('Processor.message.exception', $events[2]->getName());
166+
$data = $events[2]->getData();
167+
$this->assertArrayHasKey('duration', $data);
168+
$this->assertIsInt($data['duration']);
169+
$this->assertGreaterThanOrEqual(0, $data['duration']);
157170
}
158171

159172
/**

0 commit comments

Comments
 (0)