Skip to content

Commit 664def5

Browse files
Merge pull request #3 from codequest-eu/sync/blizzard
Sync/blizzard
2 parents 4638c9b + e9b60bc commit 664def5

28 files changed

+1651
-1256
lines changed

.github/workflows/npm-publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
submodules: recursive
1717
- uses: actions/setup-node@v3
1818
with:
19-
node-version: 18
19+
node-version: 20
2020
registry-url: https://registry.npmjs.org/
2121
cache: "npm"
2222
- run: npm ci

.github/workflows/test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ jobs:
1010
build:
1111
strategy:
1212
matrix:
13-
node: [10, 12, 14, 16, 18, 20]
13+
node: [16, 18, 20, 21]
1414
os: [ubuntu-22.04]
1515
include:
1616
# single mac test due to minute multipliers
1717
# https://docs.github.com/en/billing/managing-billing-for-github-actions/about-billing-for-github-actions
1818
- node: 20
19-
os: macos-12
19+
os: macos-14
2020
# single windows test due to node.js 14 node-gyp / vs 2022 issues
2121
- node: 20
2222
os: windows-2022
@@ -39,5 +39,5 @@ jobs:
3939
run: npm ci
4040
# skipping on windows for now due to Make / mocha exit code issues
4141
- name: Test
42-
if: runner.os != 'Windows' && matrix.node != 10
42+
if: runner.os != 'Windows'
4343
run: npm test

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ deps/*
1414
.DS_Store
1515

1616
.vscode
17+
.idea

CONTRIBUTING.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ so if you feel something is missing feel free to send a pull request.
1313
* [Contributor Agreement](#contributor-agreement)
1414

1515
[How Can I Contribute?](#how-can-i-contribute)
16+
* [Setting up the repository](#setting-up-the-repository)
1617
* [Reporting Bugs](#reporting-bugs)
1718
* [Suggesting Enhancements](#suggesting-enhancements)
1819
* [Pull Requests](#pull-requests)
@@ -37,6 +38,14 @@ Not currently required.
3738

3839
## How can I contribute?
3940

41+
### Setting up the repository
42+
43+
To set up the library locally, do the following:
44+
45+
1) Clone this repository.
46+
2) Install librdkafka with `git submodule update --init --recursive`
47+
3) Install the dependencies `npm install`
48+
4049
### Reporting Bugs
4150

4251
Please use __Github Issues__ to report bugs. When filling out an issue report,
@@ -215,7 +224,7 @@ Steps to update:
215224
```
216225
Note: This is ran automatically during CI flows but it's good to run it during the version upgrade pull request.
217226
218-
1. Run `npm install --lockfile-version 2` to build with the new version and fix any build errors that occur.
227+
1. Run `npm install` to build with the new version and fix any build errors that occur.
219228
220229
1. Run unit tests: `npm run test`
221230

README.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ I am looking for *your* help to make this project even better! If you're interes
2222

2323
The `node-rdkafka` library is a high-performance NodeJS client for [Apache Kafka](http://kafka.apache.org/) that wraps the native [librdkafka](https://github.com/edenhill/librdkafka) library. All the complexity of balancing writes across partitions and managing (possibly ever-changing) brokers should be encapsulated in the library.
2424

25-
__This library currently uses `librdkafka` version `2.3.0`.__
25+
__This library currently uses `librdkafka` version `2.6.0`.__
2626

2727
## Reference Docs
2828

@@ -39,7 +39,7 @@ Play nice; Play fair.
3939
## Requirements
4040

4141
* Apache Kafka >=0.9
42-
* Node.js >=4
42+
* Node.js >=16
4343
* Linux/Mac
4444
* Windows?! See below
4545
* OpenSSL
@@ -65,7 +65,7 @@ Using Alpine Linux? Check out the [docs](https://github.com/Blizzard/node-rdkafk
6565

6666
### Windows
6767

68-
Windows build **is not** compiled from `librdkafka` source but it is rather linked against the appropriate version of [NuGet librdkafka.redist](https://www.nuget.org/packages/librdkafka.redist/) static binary that gets downloaded from `https://globalcdn.nuget.org/packages/librdkafka.redist.2.3.0.nupkg` during installation. This download link can be changed using the environment variable `NODE_RDKAFKA_NUGET_BASE_URL` that defaults to `https://globalcdn.nuget.org/packages/` when it's no set.
68+
Windows build **is not** compiled from `librdkafka` source but it is rather linked against the appropriate version of [NuGet librdkafka.redist](https://www.nuget.org/packages/librdkafka.redist/) static binary that gets downloaded from `https://globalcdn.nuget.org/packages/librdkafka.redist.2.6.0.nupkg` during installation. This download link can be changed using the environment variable `NODE_RDKAFKA_NUGET_BASE_URL` that defaults to `https://globalcdn.nuget.org/packages/` when it's no set.
6969

7070
Requirements:
7171
* [node-gyp for Windows](https://github.com/nodejs/node-gyp#on-windows)
@@ -102,7 +102,7 @@ const Kafka = require('node-rdkafka');
102102

103103
## Configuration
104104

105-
You can pass many configuration options to `librdkafka`. A full list can be found in `librdkafka`'s [Configuration.md](https://github.com/edenhill/librdkafka/blob/v2.3.0/CONFIGURATION.md)
105+
You can pass many configuration options to `librdkafka`. A full list can be found in `librdkafka`'s [Configuration.md](https://github.com/edenhill/librdkafka/blob/v2.6.0/CONFIGURATION.md)
106106

107107
Configuration keys that have the suffix `_cb` are designated as callbacks. Some
108108
of these keys are informational and you can choose to opt-in (for example, `dr_cb`). Others are callbacks designed to
@@ -137,7 +137,7 @@ You can also get the version of `librdkafka`
137137
const Kafka = require('node-rdkafka');
138138
console.log(Kafka.librdkafkaVersion);
139139

140-
// #=> 2.3.0
140+
// #=> 2.6.0
141141
```
142142

143143
## Sending Messages
@@ -150,7 +150,7 @@ const producer = new Kafka.Producer({
150150
});
151151
```
152152

153-
A `Producer` requires only `metadata.broker.list` (the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see the [Configuration.md](https://github.com/edenhill/librdkafka/blob/v2.3.0/CONFIGURATION.md) file described previously.
153+
A `Producer` requires only `metadata.broker.list` (the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see the [Configuration.md](https://github.com/edenhill/librdkafka/blob/v2.6.0/CONFIGURATION.md) file described previously.
154154

155155
The following example illustrates a list with several `librdkafka` options set.
156156

@@ -516,6 +516,7 @@ The following table lists events for this API.
516516
|`data` | When using the Standard API consumed messages are emitted in this event. |
517517
|`partition.eof` | When using Standard API and the configuration option `enable.partition.eof` is set, `partition.eof` events are emitted in this event. The event contains `topic`, `partition` and `offset` properties. |
518518
|`warning` | The event is emitted in case of `UNKNOWN_TOPIC_OR_PART` or `TOPIC_AUTHORIZATION_FAILED` errors when consuming in *Flowing mode*. Since the consumer will continue working if the error is still happening, the warning event should reappear after the next metadata refresh. To control the metadata refresh rate set `topic.metadata.refresh.interval.ms` property. Once you resolve the error, you can manually call `getMetadata` to speed up consumer recovery. |
519+
|`rebalance` | The `rebalance` event is emitted when the consumer group is rebalanced. <br><br>This event is only emitted if the `rebalance_cb` configuration is set to a function or set to `true` |
519520
|`disconnected` | The `disconnected` event is emitted when the broker disconnects. <br><br>This event is only emitted when `.disconnect` is called. The wrapper will always try to reconnect otherwise. |
520521
|`ready` | The `ready` event is emitted when the `Consumer` is ready to read messages. |
521522
|`event` | The `event` event is emitted when `librdkafka` reports an event (if you opted in via the `event_cb` option).|

config.d.ts

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// ====== Generated from librdkafka 2.3.0 file CONFIGURATION.md ======
1+
// ====== Generated from librdkafka 2.6.0 file CONFIGURATION.md ======
22
// Code that generated this is a derivative work of the code from Nam Nguyen
33
// https://gist.github.com/ntgn81/066c2c8ec5b4238f85d1e9168a04e3fb
44

@@ -620,12 +620,33 @@ export interface GlobalConfig {
620620
"client.rack"?: string;
621621

622622
/**
623-
* Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. NOTE: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname.
623+
* The backoff time in milliseconds before retrying a protocol request, this is the first backoff time, and will be backed off exponentially until number of retries is exhausted, and it's capped by retry.backoff.max.ms.
624+
*
625+
* @default 100
626+
*/
627+
"retry.backoff.ms"?: number;
628+
629+
/**
630+
* The max backoff time in milliseconds before retrying a protocol request, this is the atmost backoff allowed for exponentially backed off requests.
631+
*
632+
* @default 1000
633+
*/
634+
"retry.backoff.max.ms"?: number;
635+
636+
/**
637+
* Controls how the client uses DNS lookups. By default, when the lookup returns multiple IP addresses for a hostname, they will all be attempted for connection before the connection is considered failed. This applies to both bootstrap and advertised servers. If the value is set to `resolve_canonical_bootstrap_servers_only`, each entry will be resolved and expanded into a list of canonical names. **WARNING**: `resolve_canonical_bootstrap_servers_only` must only be used with `GSSAPI` (Kerberos) as `sasl.mechanism`, as it's the only purpose of this configuration value. **NOTE**: Default here is different from the Java client's default behavior, which connects only to the first IP address returned for a hostname.
624638
*
625639
* @default use_all_dns_ips
626640
*/
627641
"client.dns.lookup"?: 'use_all_dns_ips' | 'resolve_canonical_bootstrap_servers_only';
628642

643+
/**
644+
* Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client
645+
*
646+
* @default true
647+
*/
648+
"enable.metrics.push"?: boolean;
649+
629650
/**
630651
* Enables or disables `event.*` emitting.
631652
*
@@ -703,20 +724,6 @@ export interface ProducerGlobalConfig extends GlobalConfig {
703724
*/
704725
"retries"?: number;
705726

706-
/**
707-
* The backoff time in milliseconds before retrying a protocol request, this is the first backoff time, and will be backed off exponentially until number of retries is exhausted, and it's capped by retry.backoff.max.ms.
708-
*
709-
* @default 100
710-
*/
711-
"retry.backoff.ms"?: number;
712-
713-
/**
714-
* The max backoff time in milliseconds before retrying a protocol request, this is the atmost backoff allowed for exponentially backed off requests.
715-
*
716-
* @default 1000
717-
*/
718-
"retry.backoff.max.ms"?: number;
719-
720727
/**
721728
* The threshold of outstanding not yet transmitted broker requests needed to backpressure the producer's message accumulator. If the number of not yet transmitted requests equals or exceeds this number, produce request creation that would have otherwise been triggered (for example, in accordance with linger.ms) will be delayed. A lower number yields larger and more effective batches. A higher value can improve latency when using compression on slow machines.
722729
*
@@ -810,12 +817,24 @@ export interface ConsumerGlobalConfig extends GlobalConfig {
810817
"heartbeat.interval.ms"?: number;
811818

812819
/**
813-
* Group protocol type. NOTE: Currently, the only supported group protocol type is `consumer`.
820+
* Group protocol type for the `classic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`.
814821
*
815822
* @default consumer
816823
*/
817824
"group.protocol.type"?: string;
818825

826+
/**
827+
* Group protocol to use. Use `classic` for the original protocol and `consumer` for the new protocol introduced in KIP-848. Available protocols: classic or consumer. Default is `classic`, but will change to `consumer` in next releases.
828+
*
829+
* @default classic
830+
*/
831+
"group.protocol"?: 'classic' | 'consumer';
832+
833+
/**
834+
* Server side assignor to use. Keep it null to make server select a suitable assignor for the group. Available assignors: uniform or range. Default is null
835+
*/
836+
"group.remote.assignor"?: string;
837+
819838
/**
820839
* How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment.
821840
*

e2e/both.spec.js

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,65 @@ describe('Consumer/Producer', function() {
614614
});
615615
});
616616

617+
describe('Cooperative sticky', function() {
618+
var consumer;
619+
620+
beforeEach(function(done) {
621+
var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
622+
623+
var consumerOpts = {
624+
'metadata.broker.list': kafkaBrokerList,
625+
'group.id': grp,
626+
'fetch.wait.max.ms': 1000,
627+
'session.timeout.ms': 10000,
628+
'enable.auto.commit': false,
629+
'debug': 'all',
630+
'partition.assignment.strategy': 'cooperative-sticky'
631+
};
632+
633+
consumer = new Kafka.KafkaConsumer(consumerOpts, {
634+
'auto.offset.reset': 'largest',
635+
});
636+
637+
consumer.connect({}, function(err, d) {
638+
t.ifError(err);
639+
t.equal(typeof d, 'object', 'metadata should be returned');
640+
done();
641+
});
642+
643+
eventListener(consumer);
644+
});
645+
646+
afterEach(function(done) {
647+
consumer.disconnect(function() {
648+
done();
649+
});
650+
});
651+
652+
it('should be able to produce and consume messages', function (done) {
653+
var key = 'key';
654+
655+
crypto.randomBytes(4096, function(ex, buffer) {
656+
producer.setPollInterval(10);
657+
658+
consumer.on('data', function(message) {
659+
t.equal(buffer.toString(), message.value.toString(), 'invalid message value');
660+
t.equal(key, message.key, 'invalid message key');
661+
t.equal(topic, message.topic, 'invalid message topic');
662+
t.ok(message.offset >= 0, 'invalid message offset');
663+
done();
664+
});
665+
666+
consumer.subscribe([topic]);
667+
consumer.consume();
668+
669+
setTimeout(function() {
670+
producer.produce(topic, null, buffer, key);
671+
}, 2000);
672+
});
673+
});
674+
});
675+
617676
function assert_headers_match(expectedHeaders, messageHeaders) {
618677
t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length');
619678
for (var i = 0; i < expectedHeaders.length; i++) {

e2e/consumer.spec.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,4 +344,43 @@ describe('Consumer', function() {
344344
});
345345

346346
});
347+
348+
describe('rebalance protocol', function () {
349+
var strategies = {
350+
'undefined': 'EAGER',
351+
'range': 'EAGER',
352+
'roundrobin': 'EAGER',
353+
'cooperative-sticky': 'COOPERATIVE',
354+
};
355+
356+
Object.keys(strategies).forEach(function (strategy) {
357+
it('should return ' + strategies[strategy] + ' for ' + strategy, function(done) {
358+
var consumer = new KafkaConsumer({
359+
...gcfg,
360+
...(strategy !== 'undefined' && { 'partition.assignment.strategy': strategy })
361+
}, {});
362+
363+
t.equal(consumer.rebalanceProtocol(), 'NONE');
364+
365+
consumer.connect({ timeout: 2000 }, function(err) {
366+
t.ifError(err);
367+
368+
consumer.subscribe([topic]);
369+
370+
consumer.on('rebalance', function (err) {
371+
if (err.code === -175) {
372+
t.equal(consumer.rebalanceProtocol(), strategies[strategy]);
373+
consumer.disconnect(done);
374+
}
375+
});
376+
377+
consumer.consume(1, function(err) {
378+
t.ifError(err);
379+
});
380+
});
381+
382+
eventListener(consumer);
383+
});
384+
});
385+
});
347386
});

0 commit comments

Comments
 (0)