Skip to content

Commit 0bed871

Browse files
committed
chore: check if native partitioner is available
1 parent 88d8385 commit 0bed871

File tree

2 files changed

+23
-4
lines changed

2 files changed

+23
-4
lines changed

src/RdKafka/TopicConf.php

+5-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public function dump(): array
4040
{
4141
$count = Library::new('size_t');
4242
$dump = Library::rd_kafka_topic_conf_dump($this->topicConf, FFI::addr($count));
43-
$count = (int) $count->cdata;
43+
$count = (int)$count->cdata;
4444

4545
$result = [];
4646
for ($i = 0; $i < $count; $i += 2) {
@@ -106,12 +106,14 @@ public function setPartitioner(int $partitioner): void
106106
break;
107107

108108
default:
109-
throw new InvalidArgumentException('Invalid partitioner');
110-
break;
109+
throw new InvalidArgumentException('Invalid partitioner given');
111110
}
112111

112+
Library::requireMethod($partitionerMethod);
113+
113114
Library::rd_kafka_topic_conf_set_partitioner_cb(
114115
$this->topicConf,
116+
// todo: replace with Library::getFFI()->$partitionerMethod when dropping php < 8.1
115117
NativePartitionerCallbackProxy::create($partitionerMethod)
116118
);
117119
}

tests/RdKafka/TopicConfTest.php

+18-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use ConsumeTrait;
88
use InvalidArgumentException;
99
use PHPUnit\Framework\TestCase;
10+
use RequireVersionTrait;
1011

1112
/**
1213
* @covers \RdKafka\TopicConf
@@ -16,6 +17,7 @@
1617
*/
1718
class TopicConfTest extends TestCase
1819
{
20+
use RequireVersionTrait;
1921
use ConsumeTrait;
2022

2123
public function testDump(): void
@@ -119,6 +121,17 @@ public function testSetPartitionerWithUnknownId(): void
119121
$conf->setPartitioner(9999);
120122
}
121123

124+
public function testSetPartitionerWithUnsupportedTypeShouldFail(): void
125+
{
126+
$this->requiresLibrdkafkaVersion('<', '1.4.0');
127+
128+
$conf = new TopicConf();
129+
130+
$this->expectException(\RuntimeException::class);
131+
$this->expectExceptionMessageMatches('/rd_kafka_msg_partitioner_fnv1a_random/');
132+
$conf->setPartitioner(RD_KAFKA_MSG_PARTITIONER_FNV1A);
133+
}
134+
122135
/**
123136
* @group ffiOnly
124137
*/
@@ -170,7 +183,11 @@ public function testSetPartitionerCbWithCallback(): void
170183
$topicConf = new TopicConf();
171184
$topicConf->setOpaque($expectedTopicOpaque);
172185
$topicConf->setPartitionerCb(
173-
function (?string $key, int $partitionCount, ?object $topic_opaque = null, ?object $message_opaque = null) use (&$callbackTopicOpaque, &$callbackMessageOpaque) {
186+
function (?string $key, int $partitionCount, ?object $topic_opaque = null, ?object $message_opaque = null) use (
187+
&
188+
$callbackTopicOpaque,
189+
&$callbackMessageOpaque
190+
) {
174191
$callbackTopicOpaque = $topic_opaque;
175192
$callbackMessageOpaque = $message_opaque;
176193
// force partition 2

0 commit comments

Comments
 (0)