Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,38 @@ const streamContext = createResumableStreamContext({
});
```

### Usage with custom Redis clients (Upstash, Valkey, etc.)

If you want to use a different Redis-compatible client (like Upstash Redis, Valkey, or any other), you can use the generic interface by importing from `resumable-stream/generic`. This requires you to provide your own `Publisher` and `Subscriber` implementations.

```typescript
import { createResumableStreamContext } from "resumable-stream/generic";
import type { Publisher, Subscriber } from "resumable-stream/generic";

// Example: Create adapters for your Redis client
const publisher: Publisher = {
connect: async () => { /* ... */ },
publish: async (channel, message) => { /* ... */ },
set: async (key, value, options) => { /* ... */ },
get: async (key) => { /* ... */ },
incr: async (key) => { /* ... */ },
};

const subscriber: Subscriber = {
connect: async () => { /* ... */ },
subscribe: async (channel, callback) => { /* ... */ },
unsubscribe: async (channel) => { /* ... */ },
};

const streamContext = createResumableStreamContext({
waitUntil: after,
publisher,
subscriber,
});
```

**Note:** When using the generic interface, both `publisher` and `subscriber` are **required**. The library will throw an error if they are not provided.

## Type Docs

[Type Docs](https://github.com/vercel/resumable-stream/blob/main/docs/README.md)
Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"exports": {
".": "./dist/index.js",
"./ioredis": "./dist/ioredis.js",
"./redis": "./dist/redis.js"
"./redis": "./dist/redis.js",
"./generic": "./dist/generic.js"
},
"typesVersions": {
"*": {
Expand All @@ -22,6 +23,9 @@
],
"redis": [
"dist/redis.d.ts"
],
"generic": [
"dist/generic.d.ts"
]
}
},
Expand Down
96 changes: 96 additions & 0 deletions src/__tests__/generic.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { describe, it, expect } from "vitest";
import { createResumableStreamContext } from "../generic";
import { createInMemoryPubSubForTesting } from "../../testing-utils/in-memory-pubsub";
import { streamToBuffer, createTestingStream } from "../../testing-utils/testing-stream";

describe("generic interface", () => {
it("should work with custom publisher/subscriber implementations", async () => {
const { publisher, subscriber } = createInMemoryPubSubForTesting();

const ctx = createResumableStreamContext({
waitUntil: null,
publisher,
subscriber,
keyPrefix: "test-generic-" + crypto.randomUUID(),
});

const { readable, writer } = createTestingStream();
const stream = await ctx.resumableStream("test-stream", () => readable);

writer.write("Hello ");
writer.write("World!");
writer.close();

expect(stream).not.toBeNull();
const result = await streamToBuffer(stream!);
expect(result).toBe("Hello World!");
});

it("should resume streams with custom implementations", async () => {
const { publisher, subscriber } = createInMemoryPubSubForTesting();

const ctx = createResumableStreamContext({
waitUntil: null,
publisher,
subscriber,
keyPrefix: "test-generic-" + crypto.randomUUID(),
});

const { readable, writer } = createTestingStream();
// Create initial stream
const stream1 = await ctx.resumableStream("test-stream-2", () => readable);

// Resume the same stream immediately
const stream2 = await ctx.resumableStream("test-stream-2", () => {
throw new Error("Should not be called");
});

writer.write("Part 1 ");
writer.write("Part 2");
writer.close();

expect(stream1).not.toBeNull();
expect(stream2).not.toBeNull();

const result1 = await streamToBuffer(stream1!);
const result2 = await streamToBuffer(stream2!);
expect(result1).toBe("Part 1 Part 2");
expect(result2).toBe("Part 1 Part 2");
});

it("should return null if stream is done", async () => {
const { publisher, subscriber } = createInMemoryPubSubForTesting();

const ctx = createResumableStreamContext({
waitUntil: null,
publisher,
subscriber,
keyPrefix: "test-generic-" + crypto.randomUUID(),
});

const { readable, writer } = createTestingStream();
const stream = await ctx.resumableStream("test-stream-3", () => readable);

writer.write("Done");
writer.close();

await streamToBuffer(stream!);

// Try to resume after stream is done
const doneStream = await ctx.resumableStream("test-stream-3", () => {
throw new Error("Should not be called");
});

expect(doneStream).toBeNull();
});

it("should throw error if publisher is not provided", () => {
expect(() => {
createResumableStreamContext({
waitUntil: null,
// @ts-expect-error - intentionally not providing publisher/subscriber to test error
});
}).toThrow();
});
});

32 changes: 32 additions & 0 deletions src/generic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { createResumableStreamContextFactory } from "./runtime";
import type { Publisher, Subscriber } from "./types";

export * from "./types";
export { resumeStream } from "./runtime";

/**
* Creates a global context for resumable streams from which you can create resumable streams.
*
* This generic version requires you to provide your own Publisher and Subscriber implementations,
* making it compatible with any Redis-like client (Upstash Redis, Valkey, etc.).
*
* @param options - The context options.
* @param options.keyPrefix - The prefix for the keys used by the resumable streams. Defaults to `resumable-stream`.
* @param options.waitUntil - A function that takes a promise and ensures that the current program stays alive until the promise is resolved.
* @param options.subscriber - A pubsub subscriber implementing the Subscriber interface. **Required**.
* @param options.publisher - A pubsub publisher implementing the Publisher interface. **Required**.
* @returns A resumable stream context.
*/
export const createResumableStreamContext = createResumableStreamContextFactory({
publisher: () => {
throw new Error(
"Generic mode requires a publisher to be provided. Please pass a publisher option to createResumableStreamContext."
);
},
subscriber: () => {
throw new Error(
"Generic mode requires a subscriber to be provided. Please pass a subscriber option to createResumableStreamContext."
);
},
});

2 changes: 1 addition & 1 deletion tsconfig.build.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
"sourceMap": true,
"noEmit": false
},
"include": ["src/index.ts", "src/ioredis.ts", "src/redis.ts"],
"include": ["src/index.ts", "src/ioredis.ts", "src/redis.ts", "src/generic.ts"],
"exclude": ["src/**/*.test.ts", "src/**/*.spec.ts", "node_modules"]
}