diff --git a/README.md b/README.md index 8caf680..83cfb83 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,38 @@ const streamContext = createResumableStreamContext({ }); ``` +### Usage with custom Redis clients (Upstash, Valkey, etc.) + +If you want to use a different Redis-compatible client (like Upstash Redis, Valkey, or any other), you can use the generic interface by importing from `resumable-stream/generic`. This requires you to provide your own `Publisher` and `Subscriber` implementations. + +```typescript +import { createResumableStreamContext } from "resumable-stream/generic"; +import type { Publisher, Subscriber } from "resumable-stream/generic"; + +// Example: Create adapters for your Redis client +const publisher: Publisher = { + connect: async () => { /* ... */ }, + publish: async (channel, message) => { /* ... */ }, + set: async (key, value, options) => { /* ... */ }, + get: async (key) => { /* ... */ }, + incr: async (key) => { /* ... */ }, +}; + +const subscriber: Subscriber = { + connect: async () => { /* ... */ }, + subscribe: async (channel, callback) => { /* ... */ }, + unsubscribe: async (channel) => { /* ... */ }, +}; + +const streamContext = createResumableStreamContext({ + waitUntil: after, + publisher, + subscriber, +}); +``` + +**Note:** When using the generic interface, both `publisher` and `subscriber` are **required**. The library will throw an error if they are not provided. + ## Type Docs [Type Docs](https://github.com/vercel/resumable-stream/blob/main/docs/README.md) diff --git a/package.json b/package.json index a50a196..b926c83 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,8 @@ "exports": { ".": "./dist/index.js", "./ioredis": "./dist/ioredis.js", - "./redis": "./dist/redis.js" + "./redis": "./dist/redis.js", + "./generic": "./dist/generic.js" }, "typesVersions": { "*": { @@ -22,6 +23,9 @@ ], "redis": [ "dist/redis.d.ts" + ], + "generic": [ + "dist/generic.d.ts" ] } }, diff --git a/src/__tests__/generic.test.ts b/src/__tests__/generic.test.ts new file mode 100644 index 0000000..8eb0b1d --- /dev/null +++ b/src/__tests__/generic.test.ts @@ -0,0 +1,96 @@ +import { describe, it, expect } from "vitest"; +import { createResumableStreamContext } from "../generic"; +import { createInMemoryPubSubForTesting } from "../../testing-utils/in-memory-pubsub"; +import { streamToBuffer, createTestingStream } from "../../testing-utils/testing-stream"; + +describe("generic interface", () => { + it("should work with custom publisher/subscriber implementations", async () => { + const { publisher, subscriber } = createInMemoryPubSubForTesting(); + + const ctx = createResumableStreamContext({ + waitUntil: null, + publisher, + subscriber, + keyPrefix: "test-generic-" + crypto.randomUUID(), + }); + + const { readable, writer } = createTestingStream(); + const stream = await ctx.resumableStream("test-stream", () => readable); + + writer.write("Hello "); + writer.write("World!"); + writer.close(); + + expect(stream).not.toBeNull(); + const result = await streamToBuffer(stream!); + expect(result).toBe("Hello World!"); + }); + + it("should resume streams with custom implementations", async () => { + const { publisher, subscriber } = createInMemoryPubSubForTesting(); + + const ctx = createResumableStreamContext({ + waitUntil: null, + publisher, + subscriber, + keyPrefix: "test-generic-" + crypto.randomUUID(), + }); + + const { readable, writer } = createTestingStream(); + // Create initial stream + const stream1 = await ctx.resumableStream("test-stream-2", () => readable); + + // Resume the same stream immediately + const stream2 = await ctx.resumableStream("test-stream-2", () => { + throw new Error("Should not be called"); + }); + + writer.write("Part 1 "); + writer.write("Part 2"); + writer.close(); + + expect(stream1).not.toBeNull(); + expect(stream2).not.toBeNull(); + + const result1 = await streamToBuffer(stream1!); + const result2 = await streamToBuffer(stream2!); + expect(result1).toBe("Part 1 Part 2"); + expect(result2).toBe("Part 1 Part 2"); + }); + + it("should return null if stream is done", async () => { + const { publisher, subscriber } = createInMemoryPubSubForTesting(); + + const ctx = createResumableStreamContext({ + waitUntil: null, + publisher, + subscriber, + keyPrefix: "test-generic-" + crypto.randomUUID(), + }); + + const { readable, writer } = createTestingStream(); + const stream = await ctx.resumableStream("test-stream-3", () => readable); + + writer.write("Done"); + writer.close(); + + await streamToBuffer(stream!); + + // Try to resume after stream is done + const doneStream = await ctx.resumableStream("test-stream-3", () => { + throw new Error("Should not be called"); + }); + + expect(doneStream).toBeNull(); + }); + + it("should throw error if publisher is not provided", () => { + expect(() => { + createResumableStreamContext({ + waitUntil: null, + // @ts-expect-error - intentionally not providing publisher/subscriber to test error + }); + }).toThrow(); + }); +}); + diff --git a/src/generic.ts b/src/generic.ts new file mode 100644 index 0000000..108a81c --- /dev/null +++ b/src/generic.ts @@ -0,0 +1,32 @@ +import { createResumableStreamContextFactory } from "./runtime"; +import type { Publisher, Subscriber } from "./types"; + +export * from "./types"; +export { resumeStream } from "./runtime"; + +/** + * Creates a global context for resumable streams from which you can create resumable streams. + * + * This generic version requires you to provide your own Publisher and Subscriber implementations, + * making it compatible with any Redis-like client (Upstash Redis, Valkey, etc.). + * + * @param options - The context options. + * @param options.keyPrefix - The prefix for the keys used by the resumable streams. Defaults to `resumable-stream`. + * @param options.waitUntil - A function that takes a promise and ensures that the current program stays alive until the promise is resolved. + * @param options.subscriber - A pubsub subscriber implementing the Subscriber interface. **Required**. + * @param options.publisher - A pubsub publisher implementing the Publisher interface. **Required**. + * @returns A resumable stream context. + */ +export const createResumableStreamContext = createResumableStreamContextFactory({ + publisher: () => { + throw new Error( + "Generic mode requires a publisher to be provided. Please pass a publisher option to createResumableStreamContext." + ); + }, + subscriber: () => { + throw new Error( + "Generic mode requires a subscriber to be provided. Please pass a subscriber option to createResumableStreamContext." + ); + }, +}); + diff --git a/tsconfig.build.json b/tsconfig.build.json index a45fdf5..33034e8 100644 --- a/tsconfig.build.json +++ b/tsconfig.build.json @@ -7,6 +7,6 @@ "sourceMap": true, "noEmit": false }, - "include": ["src/index.ts", "src/ioredis.ts", "src/redis.ts"], + "include": ["src/index.ts", "src/ioredis.ts", "src/redis.ts", "src/generic.ts"], "exclude": ["src/**/*.test.ts", "src/**/*.spec.ts", "node_modules"] }