From 6f9a6b66a8a497be946d73580b3e04d69bebc971 Mon Sep 17 00:00:00 2001 From: Karan Raina Date: Sun, 13 Oct 2024 13:38:44 +0530 Subject: [PATCH 1/2] feature: add support for getStream to read redis data in chunks using nodejs streams --- README.md | 7 +++++++ examples/get-stream.js | 36 ++++++++++++++++++++++++++++++++++++ lib/utils/Commander.ts | 8 +++++++- lib/utils/RedisCommander.ts | 5 +++++ lib/utils/nodeStreams.ts | 24 ++++++++++++++++++++++++ 5 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 examples/get-stream.js create mode 100644 lib/utils/nodeStreams.ts diff --git a/README.md b/README.md index 9881f3ce3..2d24f9370 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,13 @@ redis.zrange("sortedSet", 0, 2, "WITHSCORES").then((elements) => { // The format is: redis[SOME_REDIS_COMMAND_IN_LOWERCASE](ARGUMENTS_ARE_JOINED_INTO_COMMAND_STRING) // so the following statement is equivalent to the CLI: `redis> SET mykey hello EX 10` redis.set("mykey", "hello", "EX", 10); + +// Read the value of a redis key in chunks using nodejs streams +// Retuns a nodejs readable stream +const readStream = redis.getStream("mykey", { chunkSize: 100 * 1000 /* In Bytes */ }); + + + ``` See the `examples/` folder for more examples. For example: diff --git a/examples/get-stream.js b/examples/get-stream.js new file mode 100644 index 000000000..319647508 --- /dev/null +++ b/examples/get-stream.js @@ -0,0 +1,36 @@ +const http = require('node:http'); +const ioredis = require('ioredis'); + +const client = new ioredis(); + +async function streamFromRedis(key, response) { + const dataStream = client.getStream(key, { + chunkSize: 100 * 10, + pipeline: false, + }); + + for await (const data of dataStream) { + response.write(data); + } + + response.end(); + +} + +async function sendFromRedis(key, response) { + const reply = await client.get(key); + response.end(reply); +} + +const server = http.createServer(); + +server.on('request', (request, response) => { + if (request.url === '/stream') { + streamFromRedis('test', response).catch(console.error); + } else { + sendFromRedis('test', response).catch(console.error); + } +}); + +server.listen(3000); + diff --git a/lib/utils/Commander.ts b/lib/utils/Commander.ts index 0bb64869d..6143c1f3b 100644 --- a/lib/utils/Commander.ts +++ b/lib/utils/Commander.ts @@ -1,3 +1,4 @@ +import { Readable } from "node:stream"; import { list } from "@ioredis/commands"; import { executeWithAutoPipelining, @@ -6,7 +7,8 @@ import { import Command, { ArgumentType } from "../Command"; import Script from "../Script"; import { Callback, WriteableStream } from "../types"; -import RedisCommander, { ClientContext } from "./RedisCommander"; +import RedisCommander, { ClientContext, GetStreamOptions } from "./RedisCommander"; +import { createGetStream } from "./nodeStreams"; export interface CommanderOptions { keyPrefix?: string; @@ -115,6 +117,10 @@ Commander.prototype.callBuffer = generateFunction("callBuffer", null); // @ts-expect-error Commander.prototype.send_command = Commander.prototype.call; +Commander.prototype.getStream = function getStream(key, opts: GetStreamOptions = {}) { + return Readable.from(createGetStream(this, key, opts)); +} + function generateFunction(functionName: string | null, _encoding: string); function generateFunction( functionName: string | null, diff --git a/lib/utils/RedisCommander.ts b/lib/utils/RedisCommander.ts index 49388d8ed..78840c2f2 100644 --- a/lib/utils/RedisCommander.ts +++ b/lib/utils/RedisCommander.ts @@ -8,6 +8,7 @@ import { Callback } from "../types"; export type RedisKey = string | Buffer; export type RedisValue = string | Buffer | number; +export type GetStreamOptions = { chunkSize?: number, pipeline?: boolean } // Inspired by https://github.com/mmkal/handy-redis/blob/main/src/generated/interface.ts. // Should be fixed with https://github.com/Microsoft/TypeScript/issues/1213 @@ -3666,6 +3667,10 @@ interface RedisCommander { key: RedisKey, callback?: Callback ): Result; + getStream( + key: RedisKey, + opts: GetStreamOptions + ): Result; getBuffer( key: RedisKey, callback?: Callback diff --git a/lib/utils/nodeStreams.ts b/lib/utils/nodeStreams.ts new file mode 100644 index 000000000..8e439017c --- /dev/null +++ b/lib/utils/nodeStreams.ts @@ -0,0 +1,24 @@ +import { executeWithAutoPipelining } from "../../built/autoPipelining"; +import { GetStreamOptions } from "./RedisCommander"; + +export async function* createGetStream(client, key, opts: GetStreamOptions = {}) { + const size = opts.chunkSize || 10 * 1000; + let cursor = 0; + let isReadable = true; + const isPipelineMode = opts.pipeline !== false; + + while (isReadable) { + let chunk; + if (isPipelineMode) { + chunk = await executeWithAutoPipelining(client, 'getrange', 'range', [key, cursor, cursor + size - 1], null) + } else { + chunk = await client.getrange(key, cursor, cursor + size - 1); + } + if (!chunk || typeof chunk !== 'string' || chunk?.length === 0) { + isReadable = false; + } else { + cursor += chunk.length; + yield chunk; + } + } + }; \ No newline at end of file From 91af572ca99103a8a44e625c463e6c4c90b97cc9 Mon Sep 17 00:00:00 2001 From: Karan Raina Date: Sun, 13 Oct 2024 13:46:22 +0530 Subject: [PATCH 2/2] fix typings --- lib/utils/Commander.ts | 4 ++-- lib/utils/RedisCommander.ts | 3 ++- lib/utils/nodeStreams.ts | 7 ++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/utils/Commander.ts b/lib/utils/Commander.ts index 6143c1f3b..dd5c5b59b 100644 --- a/lib/utils/Commander.ts +++ b/lib/utils/Commander.ts @@ -7,7 +7,7 @@ import { import Command, { ArgumentType } from "../Command"; import Script from "../Script"; import { Callback, WriteableStream } from "../types"; -import RedisCommander, { ClientContext, GetStreamOptions } from "./RedisCommander"; +import RedisCommander, { ClientContext, GetStreamOptions, RedisKey } from "./RedisCommander"; import { createGetStream } from "./nodeStreams"; export interface CommanderOptions { @@ -117,7 +117,7 @@ Commander.prototype.callBuffer = generateFunction("callBuffer", null); // @ts-expect-error Commander.prototype.send_command = Commander.prototype.call; -Commander.prototype.getStream = function getStream(key, opts: GetStreamOptions = {}) { +Commander.prototype.getStream = function getStream(key: RedisKey, opts: GetStreamOptions = {}) { return Readable.from(createGetStream(this, key, opts)); } diff --git a/lib/utils/RedisCommander.ts b/lib/utils/RedisCommander.ts index 78840c2f2..26f64232c 100644 --- a/lib/utils/RedisCommander.ts +++ b/lib/utils/RedisCommander.ts @@ -4,6 +4,7 @@ * this file. */ +import { Readable } from "stream"; import { Callback } from "../types"; export type RedisKey = string | Buffer; @@ -3670,7 +3671,7 @@ interface RedisCommander { getStream( key: RedisKey, opts: GetStreamOptions - ): Result; + ): Readable; getBuffer( key: RedisKey, callback?: Callback diff --git a/lib/utils/nodeStreams.ts b/lib/utils/nodeStreams.ts index 8e439017c..38ffb0f88 100644 --- a/lib/utils/nodeStreams.ts +++ b/lib/utils/nodeStreams.ts @@ -1,7 +1,8 @@ -import { executeWithAutoPipelining } from "../../built/autoPipelining"; -import { GetStreamOptions } from "./RedisCommander"; +import { executeWithAutoPipelining } from "../autoPipelining"; +import Commander from "./Commander"; +import { GetStreamOptions, RedisKey } from "./RedisCommander"; -export async function* createGetStream(client, key, opts: GetStreamOptions = {}) { +export async function* createGetStream(client: Commander, key: RedisKey, opts: GetStreamOptions = {}) { const size = opts.chunkSize || 10 * 1000; let cursor = 0; let isReadable = true;